Merge branch 'fix/query' of https://github.com/taosdata/TDengine into fix

This commit is contained in:
yihaoDeng 2021-04-10 01:46:57 +08:00
commit adf9437093
54 changed files with 6850 additions and 1446 deletions

View File

@ -96,6 +96,25 @@ typedef struct STableMetaInfo {
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
} SColumnIndex;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
TAOS_FIELD* final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SColumn {
SColumnIndex colIndex;
int32_t numOfFilters;
SColumnFilterInfo *filterInfo;
} SColumn;
/* the structure for sql function in select clause */
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
@ -109,32 +128,24 @@ typedef struct SSqlExpr {
tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id
SColumn *pFilter; // expr filter
} SSqlExpr;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
} SColumnIndex;
typedef struct SExprFilter {
tSqlExpr *pExpr; //used for having parse
SSqlExpr *pSqlExpr;
SArray *fp;
SColumn *pFilters; //having filter info
}SExprFilter;
typedef struct SInternalField {
TAOS_FIELD field;
bool visible;
SExprInfo *pArithExprInfo;
SSqlExpr *pSqlExpr;
SExprFilter *pFieldFilters;
} SInternalField;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
TAOS_FIELD* final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SColumn {
SColumnIndex colIndex;
int32_t numOfFilters;
SColumnFilterInfo *filterInfo;
} SColumn;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
@ -243,6 +254,7 @@ typedef struct SQueryInfo {
int32_t round; // 0/1/....
int32_t bufLen;
char* buf;
int32_t havingFieldNum;
} SQueryInfo;
typedef struct {
@ -316,6 +328,7 @@ typedef struct {
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex* pColumnIndex;
TAOS_FIELD* final;
SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions
struct SLocalMerger *pLocalMerger;
} SSqlRes;

View File

@ -22,6 +22,7 @@
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "qUtil.h"
typedef struct SCompareParam {
SLocalDataSource **pLocalData;
@ -1243,6 +1244,76 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
return false;
}
bool doFilterFieldData(char *input, SExprFilter* pFieldFilters, int16_t type, bool* notSkipped) {
bool qualified = false;
for(int32_t k = 0; k < pFieldFilters->pFilters->numOfFilters; ++k) {
__filter_func_t fp = taosArrayGetP(pFieldFilters->fp, k);
SColumnFilterElem filterElem = {.filterInfo = pFieldFilters->pFilters->filterInfo[k]};
bool isnull = isNull(input, type);
if (isnull) {
if (fp == isNullOperator) {
qualified = true;
break;
} else {
continue;
}
} else {
if (fp == notNullOperator) {
qualified = true;
break;
} else if (fp == isNullOperator) {
continue;
}
}
if (fp(&filterElem, input, input, type)) {
qualified = true;
break;
}
}
*notSkipped = qualified;
return TSDB_CODE_SUCCESS;
}
int32_t doHavingFilter(SQueryInfo* pQueryInfo, tFilePage* pOutput, bool* notSkipped) {
*notSkipped = true;
if (pQueryInfo->havingFieldNum <= 0) {
return TSDB_CODE_SUCCESS;
}
//int32_t exprNum = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
size_t numOfOutput = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < numOfOutput; ++i) {
SInternalField* pInterField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SExprFilter* pFieldFilters = pInterField->pFieldFilters;
if (pFieldFilters == NULL) {
continue;
}
int32_t type = pInterField->field.type;
char* pInput = pOutput->data + pOutput->num* pFieldFilters->pSqlExpr->offset;
doFilterFieldData(pInput, pFieldFilters, type, notSkipped);
if (*notSkipped == false) {
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_SUCCESS;
}
/**
*
* @param pSql
@ -1283,6 +1354,22 @@ bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurren
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalMerge->finalModel->rowSize);
}
bool notSkipped = true;
doHavingFilter(pQueryInfo, pResBuf, &notSkipped);
if (!notSkipped) {
pRes->numOfRows = 0;
pLocalMerge->discard = !noMoreCurrentGroupRes;
if (pLocalMerge->discard) {
SColumnModel *pInternModel = pLocalMerge->pDesc->pColumnModel;
tColModelAppend(pInternModel, pLocalMerge->discardData, pLocalMerge->pTempBuffer->data, 0, 1, 1);
}
return notSkipped;
}
// no interval query, no fill operation
if (pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
genFinalResWithoutFill(pRes, pLocalMerge, pQueryInfo);

View File

@ -937,6 +937,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
return ret;
}
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, true);
if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
return code;
@ -945,6 +949,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
} else {
sql = sToken.z;
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, false);
if (pCmd->curSql == NULL) {
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
@ -953,10 +961,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
*sqlstr = sql;
if (*sqlstr == NULL) {
code = TSDB_CODE_TSC_INVALID_SQL;
}
return code;
}

View File

@ -34,6 +34,7 @@
#include "tstoken.h"
#include "tstrbuild.h"
#include "ttokendef.h"
#include "qUtil.h"
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
@ -1097,6 +1098,7 @@ static bool validateTableColumnInfo(SArray* pFieldList, SSqlCmd* pCmd) {
return true;
}
static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pCmd) {
assert(pTagsList != NULL);
@ -1676,18 +1678,6 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelectLis
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
/*
* transfer sql functions that need secondary merge into another format
* in dealing with super table queries such as: count/first/last
*/
if (isSTable) {
tscTansformFuncForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
return TSDB_CODE_SUCCESS;
}
@ -3065,6 +3055,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd)
return TSDB_CODE_SUCCESS;
}
static SColumnFilterInfo* addColumnFilterInfo(SColumn* pColumn) {
if (pColumn == NULL) {
return NULL;
@ -3088,15 +3079,11 @@ static SColumnFilterInfo* addColumnFilterInfo(SColumn* pColumn) {
}
static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SColumnFilterInfo* pColumnFilter,
SColumnIndex* columnIndex, tSqlExpr* pExpr) {
int16_t colType, tSqlExpr* pExpr) {
const char* msg = "not supported filter condition";
tSqlExpr* pRight = pExpr->pRight;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, columnIndex->tableIndex);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, columnIndex->columnIndex);
int16_t colType = pSchema->type;
if (colType >= TSDB_DATA_TYPE_TINYINT && colType <= TSDB_DATA_TYPE_BIGINT) {
colType = TSDB_DATA_TYPE_BIGINT;
} else if (colType == TSDB_DATA_TYPE_FLOAT || colType == TSDB_DATA_TYPE_DOUBLE) {
@ -3301,7 +3288,10 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
}
pColumn->colIndex = *pIndex;
return doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, pIndex, pExpr);
int16_t colType = pSchema->type;
return doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, colType, pExpr);
}
static int32_t getTablenameCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pTableCond, SStringBuilder* sb) {
@ -6030,7 +6020,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true);
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, (int32_t)size - pQueryInfo->havingFieldNum, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true);
memset(pExpr->aliasName, 0, sizeof(pExpr->aliasName));
tstrncpy(pExpr->aliasName, name, sizeof(pExpr->aliasName));
@ -6039,7 +6029,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
// NOTE: tag column does not add to source column list
SColumnList ids = getColumnList(1, 0, pColIndex->colIndex);
insertResultField(pQueryInfo, (int32_t)size, &ids, bytes, (int8_t)type, name, pExpr);
insertResultField(pQueryInfo, (int32_t)size - pQueryInfo->havingFieldNum, &ids, bytes, (int8_t)type, name, pExpr);
} else {
// if this query is "group by" normal column, time window query is not allowed
if (isTimeWindowQuery(pQueryInfo)) {
@ -6166,7 +6156,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
// projection query on super table does not compatible with "group by" syntax
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (tscIsProjectionQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
@ -6769,6 +6759,313 @@ static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS;
}
int32_t tscInsertExprFields(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SInternalField** interField) {
tSqlExprItem item = {.pNode = pExpr, .aliasName = NULL, .distinct = false};
int32_t outputIndex = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
// ADD TRUE FOR TEST
if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, true) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
++pQueryInfo->havingFieldNum;
size_t n = tscSqlExprNumOfExprs(pQueryInfo);
SSqlExpr* pSqlExpr = tscSqlExprGet(pQueryInfo, (int32_t)n - 1);
int32_t slot = tscNumOfFields(pQueryInfo) - 1;
SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, slot);
pInfo->visible = false;
if (pInfo->pFieldFilters == NULL) {
SExprFilter* pFieldFilters = calloc(1, sizeof(SExprFilter));
if (pFieldFilters == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SColumn* pFilters = calloc(1, sizeof(SColumn));
if (pFilters == NULL) {
tfree(pFieldFilters);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pFieldFilters->pFilters = pFilters;
pFieldFilters->pSqlExpr = pSqlExpr;
pSqlExpr->pFilter = pFilters;
pInfo->pFieldFilters = pFieldFilters;
}
pInfo->pFieldFilters->pExpr = pExpr;
*interField = pInfo;
return TSDB_CODE_SUCCESS;
}
int32_t tscGetExprFilters(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SInternalField** pField) {
SInternalField* pInfo = NULL;
for (int32_t i = pQueryInfo->havingFieldNum - 1; i >= 0; --i) {
pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, pQueryInfo->fieldsInfo.numOfOutput - 1 - i);
if (pInfo->pFieldFilters && 0 == tSqlExprCompare(pInfo->pFieldFilters->pExpr, pExpr)) {
*pField = pInfo;
return TSDB_CODE_SUCCESS;
}
}
int32_t ret = tscInsertExprFields(pCmd, pQueryInfo, pExpr, &pInfo);
if (ret) {
return ret;
}
*pField = pInfo;
return TSDB_CODE_SUCCESS;
}
static int32_t genExprFilter(SExprFilter * exprFilter) {
exprFilter->fp = taosArrayInit(4, sizeof(__filter_func_t));
if (exprFilter->fp == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < exprFilter->pFilters->numOfFilters; ++i) {
SColumnFilterInfo *filterInfo = &exprFilter->pFilters->filterInfo[i];
int32_t lower = filterInfo->lowerRelOptr;
int32_t upper = filterInfo->upperRelOptr;
if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) {
tscError("invalid rel optr");
return TSDB_CODE_TSC_APP_ERROR;
}
__filter_func_t ffp = getFilterOperator(lower, upper);
if (ffp == NULL) {
tscError("invalid filter info");
return TSDB_CODE_TSC_APP_ERROR;
}
taosArrayPush(exprFilter->fp, &ffp);
}
return TSDB_CODE_SUCCESS;
}
static int32_t handleExprInHavingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, int32_t sqlOptr) {
const char* msg1 = "non binary column not support like operator";
const char* msg2 = "invalid operator for binary column in having clause";
const char* msg3 = "invalid operator for bool column in having clause";
SColumn* pColumn = NULL;
SColumnFilterInfo* pColFilter = NULL;
SInternalField* pInfo = NULL;
/*
* in case of TK_AND filter condition, we first find the corresponding column and build the query condition together
* the already existed condition.
*/
if (sqlOptr == TK_AND) {
int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pExpr->pLeft, &pInfo);
if (ret) {
return ret;
}
pColumn = pInfo->pFieldFilters->pFilters;
// this is a new filter condition on this column
if (pColumn->numOfFilters == 0) {
pColFilter = addColumnFilterInfo(pColumn);
} else { // update the existed column filter information, find the filter info here
pColFilter = &pColumn->filterInfo[0];
}
if (pColFilter == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
} else if (sqlOptr == TK_OR) {
int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pExpr->pLeft, &pInfo);
if (ret) {
return ret;
}
pColumn = pInfo->pFieldFilters->pFilters;
// TODO fixme: failed to invalid the filter expression: "col1 = 1 OR col2 = 2"
pColFilter = addColumnFilterInfo(pColumn);
if (pColFilter == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
} else { // error;
return TSDB_CODE_TSC_INVALID_SQL;
}
pColFilter->filterstr =
((pInfo->field.type == TSDB_DATA_TYPE_BINARY || pInfo->field.type == TSDB_DATA_TYPE_NCHAR) ? 1 : 0);
if (pColFilter->filterstr) {
if (pExpr->tokenId != TK_EQ
&& pExpr->tokenId != TK_NE
&& pExpr->tokenId != TK_ISNULL
&& pExpr->tokenId != TK_NOTNULL
&& pExpr->tokenId != TK_LIKE
) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
} else {
if (pExpr->tokenId == TK_LIKE) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pInfo->field.type == TSDB_DATA_TYPE_BOOL) {
if (pExpr->tokenId != TK_EQ && pExpr->tokenId != TK_NE) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
}
int32_t ret = doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, pInfo->field.type, pExpr);
if (ret) {
return ret;
}
return genExprFilter(pInfo->pFieldFilters);
}
int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, int32_t parentOptr) {
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
const char* msg1 = "invalid having clause";
tSqlExpr* pLeft = pExpr->pLeft;
tSqlExpr* pRight = pExpr->pRight;
if (pExpr->tokenId == TK_AND || pExpr->tokenId == TK_OR) {
int32_t ret = getHavingExpr(pCmd, pQueryInfo, pExpr->pLeft, pExpr->tokenId);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
return getHavingExpr(pCmd, pQueryInfo, pExpr->pRight, pExpr->tokenId);
}
if (pLeft == NULL || pRight == NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pLeft->type == pRight->type) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
exchangeExpr(pExpr);
pLeft = pExpr->pLeft;
pRight = pExpr->pRight;
if (pLeft->type != SQL_NODE_SQLFUNCTION) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pRight->type != SQL_NODE_VALUE) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pExpr->tokenId >= TK_BITAND) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
//if (pLeft->pParam == NULL || pLeft->pParam->nExpr < 1) {
// return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
//}
if (pLeft->pParam) {
size_t size = taosArrayGetSize(pLeft->pParam);
for (int32_t i = 0; i < size; i++) {
tSqlExprItem* pParamElem = taosArrayGet(pLeft->pParam, i);
if (pParamElem->pNode->tokenId != TK_ALL &&
pParamElem->pNode->tokenId != TK_ID &&
pParamElem->pNode->tokenId != TK_STRING &&
pParamElem->pNode->tokenId != TK_INTEGER &&
pParamElem->pNode->tokenId != TK_FLOAT) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pParamElem->pNode->tokenId == TK_ID && (pParamElem->pNode->colInfo.z == NULL && pParamElem->pNode->colInfo.n == 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pParamElem->pNode->tokenId == TK_ID) {
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if ((getColumnIndexByName(pCmd, &pParamElem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (index.columnIndex <= 0 ||
index.columnIndex >= tscGetNumOfColumns(pTableMeta)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
}
}
}
pLeft->functionId = isValidFunction(pLeft->operand.z, pLeft->operand.n);
if (pLeft->functionId < 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
return handleExprInHavingClause(pCmd, pQueryInfo, pExpr, parentOptr);
}
int32_t parseHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* pCmd, bool isSTable, int32_t joinQuery, int32_t timeWindowQuery) {
const char* msg1 = "having only works with group by";
const char* msg2 = "functions or others can not be mixed up";
const char* msg3 = "invalid expression in having clause";
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
if (pQueryInfo->groupbyExpr.numOfGroupCols <= 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pExpr->pLeft == NULL || pExpr->pRight == NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
int32_t ret = 0;
if ((ret = getHavingExpr(pCmd, pQueryInfo, pExpr, TK_AND)) != TSDB_CODE_SUCCESS) {
return ret;
}
//REDO function check
if (!functionCompatibleCheck(pQueryInfo, joinQuery, timeWindowQuery)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
return TSDB_CODE_SUCCESS;
}
int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) {
assert(pQuerySqlNode != NULL && (pQuerySqlNode->from == NULL || taosArrayGetSize(pQuerySqlNode->from->tableList) > 0));
@ -6934,6 +7231,23 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i
}
}
// parse the having clause in the first place
if (parseHavingClause(pQueryInfo, pQuerySqlNode->pHaving, pCmd, isSTable, joinQuery, timeWindowQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
/*
* transfer sql functions that need secondary merge into another format
* in dealing with super table queries such as: count/first/last
*/
if (isSTable) {
tscTansformFuncForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
if (parseSessionClause(pCmd, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
@ -7125,3 +7439,10 @@ bool hasNormalColumnFilter(SQueryInfo* pQueryInfo) {
return false;
}

View File

@ -889,8 +889,44 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr->functionId = htons(pExpr->functionId);
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
pSqlFuncExpr->resColId = htons(pExpr->resColId);
if (pTableMeta->tableType != TSDB_SUPER_TABLE && pExpr->pFilter && pExpr->pFilter->numOfFilters > 0) {
pSqlFuncExpr->filterNum = htonl(pExpr->pFilter->numOfFilters);
} else {
pSqlFuncExpr->filterNum = 0;
}
pMsg += sizeof(SSqlFuncMsg);
if (pSqlFuncExpr->filterNum) {
pMsg += sizeof(SColumnFilterInfo) * pExpr->pFilter->numOfFilters;
// append the filter information after the basic column information
for (int32_t f = 0; f < pExpr->pFilter->numOfFilters; ++f) {
SColumnFilterInfo *pColFilter = &pExpr->pFilter->filterInfo[f];
SColumnFilterInfo *pFilterMsg = &pSqlFuncExpr->filterInfo[f];
pFilterMsg->filterstr = htons(pColFilter->filterstr);
if (pColFilter->filterstr) {
pFilterMsg->len = htobe64(pColFilter->len);
memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
pMsg += (pColFilter->len + 1); // append the additional filter binary info
} else {
pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
}
pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
tscError("invalid filter info");
return TSDB_CODE_TSC_INVALID_SQL;
}
}
}
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

View File

@ -405,6 +405,7 @@ int taos_affected_rows(TAOS_RES *tres) {
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
SSqlRes *pRes = &pSql->res;
if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
@ -419,7 +420,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SFieldInfo *pFieldInfo = &pQueryInfo->fieldsInfo;
if (pFieldInfo->final == NULL) {
if (pRes->final == NULL) {
TAOS_FIELD* f = calloc(pFieldInfo->numOfOutput, sizeof(TAOS_FIELD));
int32_t j = 0;
@ -439,10 +440,10 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
}
}
pFieldInfo->final = f;
pRes->final = f;
}
return pFieldInfo->final;
return pRes->final;
}
static bool needToFetchNewBlock(SSqlObj* pSql) {

View File

@ -430,6 +430,8 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
tfree(pRes->pArithSup);
}
tfree(pRes->final);
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
}
@ -1045,6 +1047,7 @@ SInternalField* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) {
.pSqlExpr = NULL,
.pArithExprInfo = NULL,
.visible = true,
.pFieldFilters = NULL,
};
info.field = *pField;
@ -1057,6 +1060,7 @@ SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_F
.pSqlExpr = NULL,
.pArithExprInfo = NULL,
.visible = true,
.pFieldFilters = NULL,
};
info.field = *field;
@ -1130,6 +1134,22 @@ int32_t tscGetResRowLength(SArray* pExprList) {
return size;
}
static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) {
for(int32_t i = 0; i < numOfFilters; ++i) {
if (pFilterInfo[i].filterstr) {
tfree(pFilterInfo[i].pz);
}
}
tfree(pFilterInfo);
}
static void tscColumnDestroy(SColumn* pCol) {
destroyFilterInfo(pCol->filterInfo, pCol->numOfFilters);
free(pCol);
}
void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
if (pFieldInfo == NULL) {
return;
@ -1150,10 +1170,14 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
tfree(pInfo->pArithExprInfo);
}
if (pInfo->pFieldFilters != NULL) {
tscColumnDestroy(pInfo->pFieldFilters->pFilters);
tfree(pInfo->pFieldFilters);
}
}
taosArrayDestroy(pFieldInfo->internalField);
tfree(pFieldInfo->final);
memset(pFieldInfo, 0, sizeof(SFieldInfo));
}
@ -1411,15 +1435,7 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
return taosArrayGetP(pColumnList, i);
}
static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) {
for(int32_t i = 0; i < numOfFilters; ++i) {
if (pFilterInfo[i].filterstr) {
tfree(pFilterInfo[i].pz);
}
}
tfree(pFilterInfo);
}
SColumn* tscColumnClone(const SColumn* src) {
assert(src != NULL);
@ -1436,10 +1452,6 @@ SColumn* tscColumnClone(const SColumn* src) {
return dst;
}
static void tscColumnDestroy(SColumn* pCol) {
destroyFilterInfo(pCol->filterInfo, pCol->numOfFilters);
free(pCol);
}
void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex) {
assert(src != NULL && dst != NULL);

View File

@ -39,6 +39,7 @@ extern int8_t tsEnableTelemetryReporting;
extern char tsEmail[];
extern char tsArbitrator[];
extern int8_t tsArbOnline;
extern int32_t tsDnodeId;
// common
extern int tsRpcTimer;

View File

@ -43,6 +43,7 @@ int8_t tsEnableVnodeBak = 1;
int8_t tsEnableTelemetryReporting = 1;
int8_t tsArbOnline = 0;
char tsEmail[TSDB_FQDN_LEN] = {0};
int32_t tsDnodeId = 0;
// common
int32_t tsRpcTimer = 1000;
@ -212,7 +213,7 @@ float tsAvailTmpDirectorySpace = 0;
float tsAvailDataDirGB = 0;
float tsUsedDataDirGB = 0;
float tsReservedTmpDirectorySpace = 1.0f;
float tsMinimalDataDirGB = 1.0f;
float tsMinimalDataDirGB = 2.0f;
int32_t tsTotalMemoryMB = 0;
uint32_t tsVersion = 0;

View File

@ -73,6 +73,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
int32_t cqObjRef = -1;
int32_t cqVnodeNum = 0;
void cqRmFromList(SCqObj *pObj) {
//LOCK in caller
@ -166,6 +167,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
return NULL;
}
atomic_add_fetch_32(&cqVnodeNum, 1);
cqCreateRef();
pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ");
@ -240,6 +243,13 @@ void cqClose(void *handle) {
if (hasCq == 0) {
freeSCqContext(pContext);
}
int32_t remainn = atomic_sub_fetch_32(&cqVnodeNum, 1);
if (remainn <= 0) {
int32_t ref = cqObjRef;
cqObjRef = -1;
taosCloseRef(ref);
}
}
void cqStart(void *handle) {

View File

@ -17,6 +17,7 @@
#include "os.h"
#include "cJSON.h"
#include "dnodeCfg.h"
#include "tglobal.h"
static SDnodeCfg tsCfg = {0};
static pthread_mutex_t tsCfgMutex;
@ -70,6 +71,7 @@ static void dnodeResetCfg(SDnodeCfg *cfg) {
pthread_mutex_lock(&tsCfgMutex);
tsCfg.dnodeId = cfg->dnodeId;
tsDnodeId = cfg->dnodeId;
tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
dnodePrintCfg(cfg);
dnodeWriteCfg();

View File

@ -88,10 +88,11 @@ void* qOpenQueryMgmt(int32_t vgId);
void qQueryMgmtNotifyClosed(void* pExecutor);
void qQueryMgmtReOpen(void *pExecutor);
void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo);
void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo);
void** qAcquireQInfo(void* pMgmt, uint64_t key);
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
bool checkQIdEqual(void *qHandle, uint64_t qId);
int64_t genQueryId(void);
#ifdef __cplusplus
}

View File

@ -163,6 +163,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0362) //"Table does not exist")
#define TSDB_CODE_MND_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0363) //"Invalid table type in tsdb")
#define TSDB_CODE_MND_TOO_MANY_TAGS TAOS_DEF_ERROR_CODE(0, 0x0364) //"Too many tags")
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x0365) //"Too many columns")
#define TSDB_CODE_MND_TOO_MANY_TIMESERIES TAOS_DEF_ERROR_CODE(0, 0x0366) //"Too many time series")
#define TSDB_CODE_MND_NOT_SUPER_TABLE TAOS_DEF_ERROR_CODE(0, 0x0367) //"Not super table") // operation only available for super table
#define TSDB_CODE_MND_COL_NAME_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x0368) //"Tag name too long")

View File

@ -399,36 +399,6 @@ typedef struct SColIndex {
char name[TSDB_COL_NAME_LEN]; // TODO remove it
} SColIndex;
/* sql function msg, to describe the message to vnode about sql function
* operations in select clause */
typedef struct SSqlFuncMsg {
int16_t functionId;
int16_t numOfParams;
int16_t resColId; // result column id, id of the current output column
int16_t colType;
int16_t colBytes;
SColIndex colInfo;
struct ArgElem {
int16_t argType;
int16_t argBytes;
union {
double d;
int64_t i64;
char * pz;
} argValue;
} arg[3];
} SSqlFuncMsg;
typedef struct SExprInfo {
SSqlFuncMsg base;
struct tExprNode* pExpr;
int16_t bytes;
int16_t type;
int32_t interBytes;
int64_t uid;
} SExprInfo;
typedef struct SColumnFilterInfo {
int16_t lowerRelOptr;
@ -451,6 +421,42 @@ typedef struct SColumnFilterInfo {
};
} SColumnFilterInfo;
/* sql function msg, to describe the message to vnode about sql function
* operations in select clause */
typedef struct SSqlFuncMsg {
int16_t functionId;
int16_t numOfParams;
int16_t resColId; // result column id, id of the current output column
int16_t colType;
int16_t colBytes;
SColIndex colInfo;
struct ArgElem {
int16_t argType;
int16_t argBytes;
union {
double d;
int64_t i64;
char * pz;
} argValue;
} arg[3];
int32_t filterNum;
SColumnFilterInfo filterInfo[];
} SSqlFuncMsg;
typedef struct SExprInfo {
SColumnFilterInfo * pFilter;
struct tExprNode* pExpr;
int16_t bytes;
int16_t type;
int32_t interBytes;
int64_t uid;
SSqlFuncMsg base;
} SExprInfo;
/*
* for client side struct, we only need the column id, type, bytes are not necessary
* But for data in vnode side, we need all the following information.

View File

@ -205,6 +205,11 @@
#define TK_VALUES 186
#define TK_SPACE 300
#define TK_COMMENT 301
#define TK_ILLEGAL 302

View File

@ -27,7 +27,7 @@
#define MAX_IP_SIZE 20
#define MAX_PASSWORD_SIZE 20
#define MAX_HISTORY_SIZE 1000
#define MAX_COMMAND_SIZE 65536
#define MAX_COMMAND_SIZE 1048586
#define HISTORY_FILE ".taos_history"
#define DEFAULT_RES_SHOW_NUM 100

View File

@ -238,7 +238,7 @@ void resetCommand(Command *cmd, const char s[]) {
clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size);
memset(cmd->buffer, 0, MAX_COMMAND_SIZE);
memset(cmd->command, 0, MAX_COMMAND_SIZE);
strcpy(cmd->command, s);
strncpy(cmd->command, s, MAX_COMMAND_SIZE);
int size = 0;
int width = 0;
getMbSizeInfo(s, &size, &width);

File diff suppressed because it is too large Load Diff

View File

@ -437,14 +437,14 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) {
return TAOS_DN_OFF_TIME_ZONE_NOT_MATCH;
}
if (0 != strncasecmp(clusterCfg->locale, tsLocale, strlen(tsLocale))) {
mError("\"locale\"[%s - %s] cfg parameters inconsistent", clusterCfg->locale, tsLocale);
return TAOS_DN_OFF_LOCALE_NOT_MATCH;
}
if (0 != strncasecmp(clusterCfg->charset, tsCharset, strlen(tsCharset))) {
mError("\"charset\"[%s - %s] cfg parameters inconsistent.", clusterCfg->charset, tsCharset);
return TAOS_DN_OFF_CHARSET_NOT_MATCH;
}
// if (0 != strncasecmp(clusterCfg->locale, tsLocale, strlen(tsLocale))) {
// mError("\"locale\"[%s - %s] cfg parameters inconsistent", clusterCfg->locale, tsLocale);
// return TAOS_DN_OFF_LOCALE_NOT_MATCH;
// }
// if (0 != strncasecmp(clusterCfg->charset, tsCharset, strlen(tsCharset))) {
// mError("\"charset\"[%s - %s] cfg parameters inconsistent.", clusterCfg->charset, tsCharset);
// return TAOS_DN_OFF_CHARSET_NOT_MATCH;
// }
if (clusterCfg->enableBalance != tsEnableBalance) {
mError("\"balance\"[%d - %d] cfg parameters inconsistent", clusterCfg->enableBalance, tsEnableBalance);

View File

@ -1037,6 +1037,19 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
SCreateTableMsg* pCreate = (SCreateTableMsg*)((char*)pCreate1 + sizeof(SCMCreateTableMsg));
int16_t numOfTags = htons(pCreate->numOfTags);
if (numOfTags > TSDB_MAX_TAGS) {
mError("msg:%p, app:%p table:%s, failed to create, too many tags", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
return TSDB_CODE_MND_TOO_MANY_TAGS;
}
int16_t numOfColumns = htons(pCreate->numOfColumns);
int32_t numOfCols = numOfColumns + numOfTags;
if (numOfCols > TSDB_MAX_COLUMNS) {
mError("msg:%p, app:%p table:%s, failed to create, too many columns", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
return TSDB_CODE_MND_TOO_MANY_COLUMNS;
}
SSTableObj * pStable = calloc(1, sizeof(SSTableObj));
if (pStable == NULL) {
mError("msg:%p, app:%p table:%s, failed to create, no enough memory", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
@ -1050,10 +1063,9 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
pStable->uid = (us << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
pStable->sversion = 0;
pStable->tversion = 0;
pStable->numOfColumns = htons(pCreate->numOfColumns);
pStable->numOfTags = htons(pCreate->numOfTags);
pStable->numOfColumns = numOfColumns;
pStable->numOfTags = numOfTags;
int32_t numOfCols = pStable->numOfColumns + pStable->numOfTags;
int32_t schemaSize = numOfCols * sizeof(SSchema);
pStable->schema = (SSchema *)calloc(1, schemaSize);
if (pStable->schema == NULL) {
@ -1064,11 +1076,6 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
if (pStable->numOfColumns > TSDB_MAX_COLUMNS || pStable->numOfTags > TSDB_MAX_TAGS) {
mError("msg:%p, app:%p table:%s, failed to create, too many columns", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
return TSDB_CODE_MND_INVALID_TABLE_NAME;
}
pStable->nextColId = 0;
for (int32_t col = 0; col < numOfCols; col++) {
@ -1340,6 +1347,11 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
return TSDB_CODE_MND_APP_ERROR;
}
if (pStable->numOfColumns + ncols + pStable->numOfTags > TSDB_MAX_COLUMNS) {
mError("msg:%p, app:%p stable:%s, add column, too many columns", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId);
return TSDB_CODE_MND_TOO_MANY_COLUMNS;
}
for (int32_t i = 0; i < ncols; i++) {
if (mnodeFindSuperTableColumnIndex(pStable, schema[i].name) > 0) {
mError("msg:%p, app:%p stable:%s, add column, column:%s already exist", pMsg, pMsg->rpcMsg.ahandle,

View File

@ -994,6 +994,7 @@ void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
pVgroup->dbName);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
if (pVgroup->vnodeGid[i].role != TAOS_SYNC_ROLE_SLAVE) continue;
SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mDebug("vgId:%d, index:%d, send sync vnode msg to dnode %s", pVgroup->vgId, i,
pVgroup->vnodeGid[i].pDnode->dnodeEp);

View File

@ -190,6 +190,8 @@ typedef struct SQuery {
bool stabledev; // super table stddev query
int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number
SOrderVal order;
int16_t numOfCols;
int16_t numOfTags;
@ -285,6 +287,7 @@ enum OPERATOR_TYPE_E {
OP_Fill = 13,
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
OP_Having = 16,
};
typedef struct SOperatorInfo {
@ -402,6 +405,11 @@ typedef struct SOffsetOperatorInfo {
int64_t offset;
} SOffsetOperatorInfo;
typedef struct SHavingOperatorInfo {
SArray* fp;
} SHavingOperatorInfo;
typedef struct SFillOperatorInfo {
SFillInfo *pFillInfo;
SSDataBlock *pRes;

View File

@ -98,6 +98,7 @@ typedef struct SQuerySqlNode {
SLimitVal limit; // limit offset [optional]
SLimitVal slimit; // group limit offset [optional]
SStrToken sqlstr; // sql string in select clause
struct tSqlExpr *pHaving; // having clause [optional]
} SQuerySqlNode;
typedef struct STableNamePair {
@ -253,6 +254,11 @@ SArray *tVariantListAppend(SArray *pList, tVariant *pVar, uint8_t sortOrder);
SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int32_t index);
SArray *tVariantListAppendToken(SArray *pList, SStrToken *pAliasToken, uint8_t sortOrder);
tSqlExpr *tSqlExprCreate(tSqlExpr *pLeft, tSqlExpr *pRight, int32_t optrType);
int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right);
tSqlExpr *tSqlExprClone(tSqlExpr *pSrc);
SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias);
SFromInfo *setSubquery(SFromInfo* pFromInfo, SQuerySqlNode *pSqlNode);
void *destroyFromInfo(SFromInfo* pFromInfo);
@ -272,7 +278,7 @@ void tSqlExprListDestroy(SArray *pList);
SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SFromInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit);
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit, tSqlExpr *pHaving);
SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SQuerySqlNode *pSelect, int32_t type);

View File

@ -453,7 +453,7 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
%type select {SQuerySqlNode*}
%destructor select {destroyQuerySqlNode($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G);
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G, N);
}
select(A) ::= LP select(B) RP. {A = B;}
@ -471,7 +471,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); }
// select client_version()
// select server_state()
select(A) ::= SELECT(T) selcollist(W). {
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
}
// selcollist is a list of expressions that are to become the return

View File

@ -2771,14 +2771,16 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1;
// all data are null, set it completed
if (pInfo->numOfElems == 0) {
pResInfo->complete = true;
return;
} else {
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
}
pInfo->stage += 1;
}
// the first stage, only acquire the min/max value
@ -2857,14 +2859,16 @@ static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SPercentileInfo *pInfo = (SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1;
// all data are null, set it completed
if (pInfo->numOfElems == 0) {
pResInfo->complete = true;
return;
} else {
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
}
pInfo->stage += 1;
}
if (pInfo->stage == 0) {

View File

@ -105,6 +105,30 @@ int32_t getMaximumIdleDurationSec() {
return tsShellActivityTimer * 2;
}
int64_t genQueryId(void) {
int64_t uid = 0;
int64_t did = tsDnodeId;
uid = did << 54;
int64_t pid = ((int64_t)taosGetPId()) & 0x3FF;
uid |= pid << 44;
int64_t ts = taosGetTimestampMs() & 0x1FFFFFFFF;
uid |= ts << 11;
int64_t sid = atomic_add_fetch_64(&queryHandleId, 1) & 0x7FF;
uid |= sid;
return uid;
}
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (pQuery->interval.intervalUnit != 'n' && pQuery->interval.intervalUnit != 'y') {
@ -181,6 +205,7 @@ static SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntime
static SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
static SOperatorInfo* createHavingOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
@ -1819,6 +1844,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
}
if (pQuery->havingNum > 0) {
pRuntimeEnv->proot = createHavingOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
}
if (pQuery->limit.offset > 0) {
pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
}
@ -4013,7 +4042,7 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in
return pFillCol;
}
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t vgId, bool isSTableQuery) {
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
@ -4024,8 +4053,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery);
pQuery->stabledev = isStabledev(pQuery);
pRuntimeEnv->prevResult = prevResult;
setScanLimitationByResultBuffer(pQuery);
int32_t code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
@ -4667,6 +4694,111 @@ static SSDataBlock* doOffset(void* param) {
}
}
bool doFilterData(SColumnInfoData* p, int32_t rid, SColumnFilterElem *filterElem, __filter_func_t fp) {
char* input = p->pData + p->info.bytes * rid;
bool isnull = isNull(input, p->info.type);
if (isnull) {
return (fp == isNullOperator) ? true : false;
} else {
if (fp == notNullOperator) {
return true;
} else if (fp == isNullOperator) {
return false;
}
}
if (fp(filterElem, input, input, p->info.type)) {
return true;
}
return false;
}
void doHavingImpl(SOperatorInfo *pOperator, SSDataBlock *pBlock) {
SHavingOperatorInfo* pInfo = pOperator->info;
int32_t f = 0;
int32_t allQualified = 1;
int32_t exprQualified = 0;
for (int32_t r = 0; r < pBlock->info.rows; ++r) {
allQualified = 1;
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
SExprInfo* pExprInfo = &(pOperator->pExpr[i]);
if (pExprInfo->pFilter == NULL) {
continue;
}
SArray* es = taosArrayGetP(pInfo->fp, i);
assert(es);
size_t fpNum = taosArrayGetSize(es);
exprQualified = 0;
for (int32_t m = 0; m < fpNum; ++m) {
__filter_func_t fp = taosArrayGetP(es, m);
assert(fp);
//SColIndex* colIdx = &pExprInfo->base.colInfo;
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
SColumnFilterElem filterElem = {.filterInfo = pExprInfo->pFilter[m]};
if (doFilterData(p, r, &filterElem, fp)) {
exprQualified = 1;
break;
}
}
if (exprQualified == 0) {
allQualified = 0;
break;
}
}
if (allQualified == 0) {
continue;
}
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData + f * bytes, pColInfoData->pData + bytes * r, bytes);
}
++f;
}
pBlock->info.rows = f;
}
static SSDataBlock* doHaving(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo *)param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
while (1) {
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) {
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
doHavingImpl(pOperator, pBlock);
return pBlock;
}
}
static SSDataBlock* doIntervalAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
@ -5017,6 +5149,13 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
static void destroyHavingOperatorInfo(void* param, int32_t numOfOutput) {
SHavingOperatorInfo* pInfo = (SHavingOperatorInfo*) param;
if (pInfo->fp) {
taosArrayDestroy(pInfo->fp);
}
}
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
@ -5073,6 +5212,83 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
return pOperator;
}
int32_t initFilterFp(SExprInfo* pExpr, int32_t numOfOutput, SArray** fps) {
__filter_func_t fp = NULL;
*fps = taosArrayInit(numOfOutput, sizeof(SArray*));
if (*fps == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = &(pExpr[i]);
SColIndex* colIdx = &pExprInfo->base.colInfo;
if (pExprInfo->pFilter == NULL || !TSDB_COL_IS_NORMAL_COL(colIdx->flag)) {
taosArrayPush(*fps, &fp);
continue;
}
int32_t filterNum = pExprInfo->base.filterNum;
SColumnFilterInfo *filterInfo = pExprInfo->pFilter;
SArray* es = taosArrayInit(filterNum, sizeof(__filter_func_t));
for (int32_t j = 0; j < filterNum; ++j) {
int32_t lower = filterInfo->lowerRelOptr;
int32_t upper = filterInfo->upperRelOptr;
if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) {
qError("invalid rel optr");
taosArrayDestroy(es);
return TSDB_CODE_QRY_APP_ERROR;
}
__filter_func_t ffp = getFilterOperator(lower, upper);
if (ffp == NULL) {
qError("invalid filter info");
taosArrayDestroy(es);
return TSDB_CODE_QRY_APP_ERROR;
}
taosArrayPush(es, &ffp);
filterInfo += 1;
}
taosArrayPush(*fps, &es);
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createHavingOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SHavingOperatorInfo* pInfo = calloc(1, sizeof(SHavingOperatorInfo));
initFilterFp(pExpr, numOfOutput, &pInfo->fp);
assert(pInfo->fp);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "HavingOperator";
pOperator->operatorType = OP_Having;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = pExpr;
pOperator->upstream = upstream;
pOperator->exec = doHaving;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanup = destroyHavingOperatorInfo;
return pOperator;
}
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQuery->limit.limit;
@ -5644,9 +5860,35 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pExprMsg->resColId = htons(pExprMsg->resColId);
pExprMsg->filterNum = htonl(pExprMsg->filterNum);
pMsg += sizeof(SSqlFuncMsg);
SColumnFilterInfo* pExprFilterInfo = pExprMsg->filterInfo;
pMsg += sizeof(SColumnFilterInfo) * pExprMsg->filterNum;
for (int32_t f = 0; f < pExprMsg->filterNum; ++f) {
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pExprFilterInfo;
pFilterMsg->filterstr = htons(pFilterMsg->filterstr);
if (pFilterMsg->filterstr) {
pFilterMsg->len = htobe64(pFilterMsg->len);
pFilterMsg->pz = (int64_t)pMsg;
pMsg += (pFilterMsg->len + 1);
} else {
pFilterMsg->lowerBndi = htobe64(pFilterMsg->lowerBndi);
pFilterMsg->upperBndi = htobe64(pFilterMsg->upperBndi);
}
pFilterMsg->lowerRelOptr = htons(pFilterMsg->lowerRelOptr);
pFilterMsg->upperRelOptr = htons(pFilterMsg->upperRelOptr);
pExprFilterInfo++;
}
for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) {
pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType);
pExprMsg->arg[j].argBytes = htons(pExprMsg->arg[j].argBytes);
@ -5831,6 +6073,42 @@ _cleanup:
return code;
}
int32_t cloneExprFilterInfo(SColumnFilterInfo **dst, SColumnFilterInfo* src, int32_t filterNum) {
if (filterNum <= 0) {
return TSDB_CODE_SUCCESS;
}
*dst = calloc(filterNum, sizeof(*src));
if (*dst == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(*dst, src, sizeof(*src) * filterNum);
for (int32_t i = 0; i < filterNum; i++) {
if ((*dst)[i].filterstr && dst[i]->len > 0) {
void *pz = calloc(1, (size_t)(*dst)[i].len + 1);
if (pz == NULL) {
if (i == 0) {
free(*dst);
} else {
freeColumnFilterInfo(*dst, i);
}
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(pz, (void *)src->pz, (size_t)src->len + 1);
(*dst)[i].pz = (int64_t)pz;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg);
@ -5944,6 +6222,13 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu
type = s->type;
bytes = s->bytes;
}
if (pExprs[i].base.filterNum > 0) {
int32_t ret = cloneExprFilterInfo(&pExprs[i].pFilter, pExprMsg[i]->filterInfo, pExprMsg[i]->filterNum);
if (ret) {
return ret;
}
}
}
int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64;
@ -6184,6 +6469,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
goto _cleanup_qinfo;
}
pQInfo->qId = *qId;
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
SQuery* pQuery = &pQInfo->query;
@ -6233,6 +6520,10 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
if (TSDB_COL_IS_TAG(pExprs[col].base.colInfo.flag)) {
pQuery->tagLen += pExprs[col].bytes;
}
if (pExprs[col].pFilter) {
++pQuery->havingNum;
}
}
doUpdateExprColumnIndex(pQuery);
@ -6316,8 +6607,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
// todo refactor
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1);
*qId = pQInfo->qId;
qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
return pQInfo;
@ -6336,6 +6625,10 @@ _cleanup_qinfo:
tExprTreeDestroy(pExprInfo->pExpr, NULL);
pExprInfo->pExpr = NULL;
}
if (pExprInfo->pFilter) {
freeColumnFilterInfo(pExprInfo->pFilter, pExprInfo->base.filterNum);
}
}
tfree(pExprs);
@ -6381,6 +6674,8 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p
SArray* prevResult = NULL;
if (pQueryMsg->prevResultLen > 0) {
prevResult = interResFromBinary(param->prevResult, pQueryMsg->prevResultLen);
pRuntimeEnv->prevResult = prevResult;
}
pQuery->precision = tsdbGetCfg(tsdb)->precision;
@ -6402,7 +6697,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p
}
// filter the qualified
if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) {
if ((code = doInitQInfo(pQInfo, pTsBuf, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -6420,7 +6715,7 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
}
for (int32_t i = 0; i < numOfFilters; i++) {
if (pFilter[i].filterstr) {
if (pFilter[i].filterstr && pFilter[i].pz) {
free((void*)(pFilter[i].pz));
}
}
@ -6462,6 +6757,10 @@ static void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr) {
if (pExprInfo[i].pExpr != NULL) {
tExprTreeDestroy(pExprInfo[i].pExpr, NULL);
}
if (pExprInfo[i].pFilter) {
freeColumnFilterInfo(pExprInfo[i].pFilter, pExprInfo[i].base.filterNum);
}
}
tfree(pExprInfo);

View File

@ -124,7 +124,7 @@ bool greaterEqualOperator(SColumnFilterElem *pFilter, const char *minval, const
bool equalOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) {
SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo;
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t minv = -1, maxv = -1;
GET_TYPED_DATA(minv, int64_t, type, minval);
GET_TYPED_DATA(maxv, int64_t, type, maxval);
@ -202,7 +202,7 @@ bool likeOperator(SColumnFilterElem *pFilter, const char *minval, const char *ma
bool notEqualOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) {
SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo;
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t minv = -1, maxv = -1;
GET_TYPED_DATA(minv, int64_t, type, minval);
GET_TYPED_DATA(maxv, int64_t, type, maxval);

View File

@ -287,6 +287,10 @@ static void lruListMoveToFront(SList *pList, SPageInfo* pi) {
tdListPrependNode(pList, pi->pn);
}
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) {
return pageSize + POINTER_BYTES + 2 + sizeof(tFilePage);
}
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
pResultBuf->statis.getPages += 1;
@ -311,7 +315,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
// allocate buf
if (availablePage == NULL) {
pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES + 2); // add extract bytes in case of zipped buffer increased.
pi->pData = calloc(1, getAllocPageSize(pResultBuf->pageSize)); // add extract bytes in case of zipped buffer increased.
} else {
pi->pData = availablePage;
}
@ -355,7 +359,7 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
}
if (availablePage == NULL) {
(*pi)->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES);
(*pi)->pData = calloc(1, getAllocPageSize(pResultBuf->pageSize));
} else {
(*pi)->pData = availablePage;
}

View File

@ -310,6 +310,77 @@ tSqlExpr *tSqlExprCreate(tSqlExpr *pLeft, tSqlExpr *pRight, int32_t optrType) {
return pExpr;
}
static FORCE_INLINE int32_t tStrTokenCompare(SStrToken* left, SStrToken* right) {
return (left->type == right->type && left->n == right->n && strncasecmp(left->z, right->z, left->n) == 0) ? 0 : 1;
}
int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right) {
if ((left == NULL && right) || (left && right == NULL)) {
return 1;
}
if (left->type != right->type) {
return 1;
}
if (left->tokenId != right->tokenId) {
return 1;
}
if (left->functionId != right->functionId) {
return 1;
}
if ((left->pLeft && right->pLeft == NULL)
|| (left->pLeft == NULL && right->pLeft)
|| (left->pRight && right->pRight == NULL)
|| (left->pRight == NULL && right->pRight)
|| (left->pParam && right->pParam == NULL)
|| (left->pParam == NULL && right->pParam)) {
return 1;
}
if (tVariantCompare(&left->value, &right->value)) {
return 1;
}
if (tStrTokenCompare(&left->colInfo, &right->colInfo)) {
return 1;
}
if (right->pParam && left->pParam) {
size_t size = taosArrayGetSize(right->pParam);
if (left->pParam && taosArrayGetSize(left->pParam) != size) {
return 1;
}
for (int32_t i = 0; i < size; i++) {
tSqlExprItem* pLeftElem = taosArrayGet(left->pParam, i);
tSqlExpr* pSubLeft = pLeftElem->pNode;
tSqlExprItem* pRightElem = taosArrayGet(left->pParam, i);
tSqlExpr* pSubRight = pRightElem->pNode;
if (tSqlExprCompare(pSubLeft, pSubRight)) {
return 1;
}
}
}
if (left->pLeft && tSqlExprCompare(left->pLeft, right->pLeft)) {
return 1;
}
if (left->pRight && tSqlExprCompare(left->pRight, right->pRight)) {
return 1;
}
return 0;
}
tSqlExpr *tSqlExprClone(tSqlExpr *pSrc) {
tSqlExpr *pExpr = calloc(1, sizeof(tSqlExpr));
@ -640,7 +711,7 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SFromInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SSessionWindowVal *pSession, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit,
SLimitVal *psLimit) {
SLimitVal *psLimit, tSqlExpr *pHaving) {
assert(pSelectList != NULL);
SQuerySqlNode *pSqlNode = calloc(1, sizeof(SQuerySqlNode));
@ -655,6 +726,7 @@ SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SF
pSqlNode->pSortOrder = pSortOrder;
pSqlNode->pWhere = pWhere;
pSqlNode->fillType = pFill;
pSqlNode->pHaving = pHaving;
if (pLimit != NULL) {
pSqlNode->limit = *pLimit;
@ -718,6 +790,9 @@ void destroyQuerySqlNode(SQuerySqlNode *pQuerySql) {
tSqlExprDestroy(pQuerySql->pWhere);
pQuerySql->pWhere = NULL;
tSqlExprDestroy(pQuerySql->pHaving);
pQuerySql->pHaving = NULL;
taosArrayDestroyEx(pQuerySql->pSortOrder, freeVariant);
pQuerySql->pSortOrder = NULL;

View File

@ -197,6 +197,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
return code;
}
bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo);
@ -475,7 +476,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
qDebug("vgId:%d, queryMgmt cleanup completed", vgId);
}
void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo) {
void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo) {
if (pMgmt == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
@ -516,8 +517,7 @@ void** qAcquireQInfo(void* pMgmt, uint64_t _key) {
return NULL;
}
TSDB_CACHE_PTR_TYPE key = (TSDB_CACHE_PTR_TYPE)_key;
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &key, sizeof(TSDB_CACHE_PTR_TYPE));
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &_key, sizeof(_key));
if (handle == NULL || *handle == NULL) {
terrno = TSDB_CODE_QRY_INVALID_QHANDLE;
return NULL;

File diff suppressed because it is too large Load Diff

View File

@ -409,23 +409,22 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force)
syncReleaseNode(pNode);
}
#if 1
void syncRecover(int64_t rid) {
SSyncPeer *pPeer;
SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return;
// to do: add a few lines to check if recover is OK
// if take this node to unsync state, the whole system may not work
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
nodeVersion = 0;
pthread_mutex_lock(&pNode->mutex);
nodeVersion = 0;
for (int32_t i = 0; i < pNode->replica; ++i) {
if (i == pNode->selfIndex) continue;
pPeer = pNode->peerInfo[i];
if (pPeer->peerFd >= 0) {
syncRestartConnection(pPeer);
@ -436,7 +435,6 @@ void syncRecover(int64_t rid) {
syncReleaseNode(pNode);
}
#endif
int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
SSyncNode *pNode = syncAcquireNode(rid);
@ -551,7 +549,10 @@ static void syncClosePeerConn(SSyncPeer *pPeer) {
if (pPeer->peerFd >= 0) {
pPeer->peerFd = -1;
void *pConn = pPeer->pConn;
if (pConn != NULL) syncFreeTcpConn(pPeer->pConn);
if (pConn != NULL) {
syncFreeTcpConn(pPeer->pConn);
pPeer->pConn = NULL;
}
}
}
@ -997,17 +998,24 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
int32_t code = 0;
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version;
(*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
code = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
} else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead);
code = syncSaveIntoBuffer(pPeer, pHead);
} else {
sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus],
pHead->version);
code = -1;
}
}
if (code != 0) {
sError("%s, failed to process fwd msg, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
syncRestartConnection(pPeer);
}
}
static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) {

View File

@ -2861,12 +2861,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
}
SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
pHandle->statis[i].max = pBlockInfo->compBlock->keyLast;
}
}
int64_t elapsed = taosGetTimestampUs() - stime;

View File

@ -175,6 +175,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_ID, "Table name too long")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_NAME, "Table does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_TYPE, "Invalid table type in tsdb")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TIMESERIES, "Too many time series")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_SUPER_TABLE, "Not super table") // operation only available for super table
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COL_NAME_TOO_LONG, "Tag name too long")

View File

@ -37,6 +37,7 @@ extern int32_t vDebugFlag;
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
int64_t queuedWMsgSize;
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t flowctrlLevel;

View File

@ -99,8 +99,13 @@ int32_t vnodeSync(int32_t vgId) {
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
if (pVnode->role == TAOS_SYNC_ROLE_SLAVE) {
vInfo("vgId:%d, vnode will sync, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
pVnode->version = 0;
pVnode->fversion = 0;
walResetVersion(pVnode->wal, pVnode->fversion);
syncRecover(pVnode->sync);
}

View File

@ -208,6 +208,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
pRsp->completed = true;
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
void * pCont = pRead->pCont;
int32_t contLen = pRead->contLen;
@ -226,7 +227,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (contLen != 0) {
qinfo_t pQInfo = NULL;
uint64_t qId = 0;
uint64_t qId = genQueryId();
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId);
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
@ -239,7 +240,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// current connect is broken
if (code == TSDB_CODE_SUCCESS) {
handle = qRegisterQInfo(pVnode->qMgmt, qId, (uint64_t)pQInfo);
handle = qRegisterQInfo(pVnode->qMgmt, qId, pQInfo);
if (handle == NULL) { // failed to register qhandle
pRsp->code = terrno;
terrno = 0;

View File

@ -25,6 +25,7 @@
#include "vnodeStatus.h"
#define MAX_QUEUED_MSG_NUM 100000
#define MAX_QUEUED_MSG_SIZE 1024*1024*1024 //1GB
extern void * tsDnodeTmr;
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
@ -269,6 +270,13 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
}
}
if (tsAvailDataDirGB <= tsMinimalDataDirGB) {
vError("vgId:%d, failed to write into vwqueue since no diskspace, avail:%fGB", pVnode->vgId, tsAvailDataDirGB);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
return TSDB_CODE_VND_NO_DISKSPACE;
}
if (!vnodeInReadyOrUpdatingStatus(pVnode)) {
vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId,
vnodeStatus[pVnode->status], pVnode->refCount, pVnode);
@ -278,14 +286,17 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
}
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
if (queued > MAX_QUEUED_MSG_NUM) {
int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
if (queued > MAX_QUEUED_MSG_NUM || queuedSize > MAX_QUEUED_MSG_SIZE) {
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
if (ms > 100) ms = 100;
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms);
taosMsleep(ms);
}
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d size:%" PRId64, pVnode->vgId, pVnode->refCount,
pVnode->queuedWMsg, pVnode->queuedWMsgSize);
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
return TSDB_CODE_SUCCESS;
@ -308,7 +319,10 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
SVnodeObj *pVnode = vparam;
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued);
int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite,
pWrite->rpcMsg.ahandle, queued, queuedSize);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
@ -344,7 +358,9 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
SVnodeObj *pVnode = pWrite->pVnode;
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0;
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->queuedWMsgSize < MAX_QUEUED_MSG_SIZE &&
pVnode->flowctrlLevel <= 0)
return 0;
if (tsEnableFlowCtrl == 0) {
int32_t ms = (int32_t)pow(2, pVnode->flowctrlLevel + 2);

View File

@ -28,7 +28,8 @@
int points = 5;
int numOfTables = 3;
int tablesProcessed = 0;
int tablesInsertProcessed = 0;
int tablesSelectProcessed = 0;
int64_t st, et;
typedef struct {
@ -134,6 +135,9 @@ int main(int argc, char *argv[])
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
tablesInsertProcessed = 0;
tablesSelectProcessed = 0;
for (i = 0; i<numOfTables; ++i) {
// insert records in asynchronous API
sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i);
@ -143,10 +147,20 @@ int main(int argc, char *argv[])
printf("once insert finished, presse any key to query\n");
getchar();
while(1) {
if (tablesInsertProcessed < numOfTables) {
printf("wait for process finished\n");
sleep(1);
continue;
}
break;
}
printf("start to query...\n");
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
tablesProcessed = 0;
for (i = 0; i < numOfTables; ++i) {
// select records in asynchronous API
@ -157,14 +171,8 @@ int main(int argc, char *argv[])
printf("\nonce finished, press any key to exit\n");
getchar();
for (i = 0; i<numOfTables; ++i) {
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
}
getchar();
while(1) {
if (tablesProcessed < numOfTables) {
if (tablesSelectProcessed < numOfTables) {
printf("wait for process finished\n");
sleep(1);
continue;
@ -173,6 +181,10 @@ int main(int argc, char *argv[])
break;
}
for (i = 0; i<numOfTables; ++i) {
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
}
taos_close(taos);
free(tableList);
@ -214,8 +226,8 @@ void taos_insert_call_back(void *param, TAOS_RES *tres, int code)
}
else {
printf("%d rows data are inserted into %s\n", points, pTable->name);
tablesProcessed++;
if (tablesProcessed >= numOfTables) {
tablesInsertProcessed++;
if (tablesInsertProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables);
@ -251,15 +263,17 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
//taos_free_result(tres);
printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name);
tablesProcessed++;
if (tablesProcessed >= numOfTables) {
tablesSelectProcessed++;
if (tablesSelectProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables);
}
}
taos_free_result(tres);
}
}
void taos_select_call_back(void *param, TAOS_RES *tres, int code)
@ -276,6 +290,4 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code)
taos_cleanup();
exit(1);
}
taos_free_result(tres);
}

View File

@ -19,6 +19,10 @@ Run lua sample:
lua test.lua
```
## Run performance test:
```
time lua benchmark.lua
```
## OpenResty Dependencies
- OpenResty:
```

View File

@ -0,0 +1,67 @@
local driver = require "luaconnector"
local config = {
password = "taosdata",
host = "127.0.0.1",
port = 6030,
database = "",
user = "root",
max_packet_size = 1024 * 1024
}
local conn
local res = driver.connect(config)
if res.code ~=0 then
print("connect--- failed: "..res.error)
return
else
conn = res.conn
print("connect--- pass.")
end
local res = driver.query(conn,"drop database if exists demo")
res = driver.query(conn,"create database demo")
if res.code ~=0 then
print("create db--- failed: "..res.error)
return
else
print("create db--- pass.")
end
res = driver.query(conn,"use demo")
if res.code ~=0 then
print("select db--- failed: "..res.error)
return
else
print("select db--- pass.")
end
res = driver.query(conn,"create table m1 (ts timestamp, speed int,owner binary(20))")
if res.code ~=0 then
print("create table---failed: "..res.error)
return
else
print("create table--- pass.")
end
local base = 1617330000000
local index =0
local count = 100000
local t
while( index < count )
do
t = base + index
local q=string.format([[insert into m1 values (%d,0,'robotspace')]],t)
res = driver.query(conn,q)
if res.code ~=0 then
print("insert records failed: "..res.error)
return
else
end
index = index+1
end
print(string.format([["Done. %d records has been stored."]],count))
driver.close(conn)

View File

@ -1,2 +1,2 @@
gcc lua_connector.c -fPIC -shared -o luaconnector.so -Wall -ltaos
gcc -std=c99 lua_connector.c -fPIC -shared -o luaconnector.so -Wall -ltaos

View File

@ -1,2 +1,2 @@
gcc lua_connector51.c -fPIC -shared -o luaconnector51.so -Wall -ltaos
gcc -std=c99 lua_connector51.c -fPIC -shared -o luaconnector51.so -Wall -ltaos

View File

@ -23,7 +23,7 @@ static int l_connect(lua_State *L){
luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L,-1,"host");
lua_getfield(L,1,"host");
if (lua_isstring(L,-1)){
host = lua_tostring(L, -1);
// printf("host = %s\n", host);

View File

@ -17247,3 +17247,88 @@
fun:_PyEval_EvalFrameDefault
fun:_PyEval_EvalCodeWithName
}
{
<insert_a_suppression_name_here>
Memcheck:Leak
match-leak-kinds: definite
fun:malloc
obj:/usr/lib/python3/dist-packages/_cffi_backend.cpython-38-x86_64-linux-gnu.so
obj:/usr/lib/python3/dist-packages/_cffi_backend.cpython-38-x86_64-linux-gnu.so
fun:_PyEval_EvalFrameDefault
fun:_PyFunction_Vectorcall
fun:_PyEval_EvalFrameDefault
fun:_PyEval_EvalCodeWithName
fun:_PyEval_EvalCodeWithName
obj:/usr/bin/python3.8
obj:/usr/bin/python3.8
fun:PyVectorcall_Call
fun:_PyEval_EvalFrameDefault
}
{
<insert_a_suppression_name_here>
Memcheck:Leak
match-leak-kinds: definite
fun:malloc
obj:/usr/lib/python3/dist-packages/_cffi_backend.cpython-38-x86_64-linux-gnu.so
obj:/usr/lib/python3/dist-packages/_cffi_backend.cpython-38-x86_64-linux-gnu.so
obj:/usr/lib/python3/dist-packages/_cffi_backend.cpython-38-x86_64-linux-gnu.so
obj:/usr/bin/python3.8
fun:_PyEval_EvalFrameDefault
fun:_PyFunction_Vectorcall
fun:_PyEval_EvalFrameDefault
obj:/usr/bin/python3.8
fun:_PyEval_EvalFrameDefault
obj:/usr/bin/python3.8
fun:_PyEval_EvalFrameDefault
}
{
<insert_a_suppression_name_here>
Memcheck:Leak
match-leak-kinds: definite
fun:malloc
obj:/usr/lib/python3/dist-packages/_cffi_backend.cpython-38-x86_64-linux-gnu.so
obj:/usr/lib/python3/dist-packages/_cffi_backend.cpython-38-x86_64-linux-gnu.so
fun:_PyEval_EvalFrameDefault
fun:_PyFunction_Vectorcall
fun:_PyEval_EvalFrameDefault
fun:_PyEval_EvalCodeWithName
fun:PyEval_EvalCode
obj:/usr/bin/python3.8
obj:/usr/bin/python3.8
fun:PyVectorcall_Call
fun:_PyEval_EvalFrameDefault
}
{
<insert_a_suppression_name_here>
Memcheck:Leak
match-leak-kinds: definite
fun:malloc
obj:/usr/lib/python3/dist-packages/_cffi_backend.cpython-38-x86_64-linux-gnu.so
obj:/usr/lib/python3/dist-packages/_cffi_backend.cpython-38-x86_64-linux-gnu.so
obj:/usr/bin/python3.8
fun:_PyEval_EvalFrameDefault
fun:_PyFunction_Vectorcall
fun:_PyEval_EvalFrameDefault
obj:/usr/bin/python3.8
fun:_PyEval_EvalFrameDefault
obj:/usr/bin/python3.8
fun:_PyEval_EvalFrameDefault
fun:_PyEval_EvalCodeWithName
}
{
<insert_a_suppression_name_here>
Memcheck:Leak
match-leak-kinds: definite
fun:malloc
obj:/usr/lib/python3/dist-packages/_cffi_backend.cpython-38-x86_64-linux-gnu.so
obj:/usr/bin/python3.8
obj:/usr/bin/python3.8
obj:/usr/bin/python3.8
fun:PyObject_CallMethod
fun:PyInit__openssl
fun:_PyImport_LoadDynamicModuleWithSpec
obj:/usr/bin/python3.8
obj:/usr/bin/python3.8
fun:PyVectorcall_Call
fun:_PyEval_EvalFrameDefault
}

View File

@ -258,6 +258,8 @@ python3 test.py -f subscribe/singlemeter.py
#python3 test.py -f subscribe/stability.py
python3 test.py -f subscribe/supertable.py
# topic
python3 ./test.py -f topic/topicQuery.py
#======================p3-end===============
#======================p4-start===============

View File

@ -36,12 +36,7 @@ for defiMemError in `grep 'definitely lost:' crash_gen-definitely-lost-out.log |
do
defiMemError=(${defiMemError//,/})
if [ -n "$defiMemError" ]; then
if [ "$defiMemError" -gt 0 -a "$defiMemError" -lt 1013 ]; then
cat valgrind.err
echo -e "${RED} ## Memory errors number valgrind reports \
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
exit 8
elif [ "$defiMemError" -gt 1013 ];then #add for azure
if [ "$defiMemError" -gt 0 ]; then
cat valgrind.err
echo -e "${RED} ## Memory errors number valgrind reports \
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"

View File

@ -53,12 +53,7 @@ for defiMemError in `grep 'definitely lost:' taosd-definitely-lost-out.log | awk
do
defiMemError=(${defiMemError//,/})
if [ -n "$defiMemError" ]; then
if [ "$defiMemError" -gt 0 -a "$defiMemError" -lt 1013 ]; then
cat $VALGRIND_ERR
echo -e "${RED} ## Memory errors number valgrind reports \
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
exit 8
elif [ "$defiMemError" -gt 1013 ];then #add for azure
if [ "$defiMemError" -gt 0 ]; then
cat $VALGRIND_ERR
echo -e "${RED} ## Memory errors number valgrind reports \
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"

View File

@ -43,6 +43,9 @@ class TDTestCase:
tdSql.query("select * from tb")
tdSql.checkRows(insertRows + 4)
# test case for https://jira.taosdata.com:18080/browse/TD-3716:
tdSql.error("insert into tb(now, 1)")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)

View File

@ -0,0 +1,91 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1538548685000
def run(self):
tdSql.prepare()
# test case for https://jira.taosdata.com:18080/browse/TD-3679
print("==============step1")
tdSql.execute(
"create topic tq_test partitions 10")
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(0, %d, 'aaaa')" % self.ts)
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(1, %d, 'aaaa')" % (self.ts + 1))
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(2, %d, 'aaaa')" % (self.ts + 2))
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(3, %d, 'aaaa')" % (self.ts + 3))
print("==============step2")
tdSql.query("select * from tq_test.p1")
tdSql.checkRows(4)
tdSql.query("select * from tq_test.p1 where ts >= %d" % self.ts)
tdSql.checkRows(4)
tdSql.query("select * from tq_test.p1 where ts > %d" % self.ts)
tdSql.checkRows(3)
tdSql.query("select * from tq_test.p1 where ts = %d" % self.ts)
tdSql.checkRows(1)
tdSql.execute("use db")
tdSql.execute("create table test(ts timestamp, start timestamp, value int)")
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts, self.ts))
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 1, self.ts + 1))
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 2, self.ts + 2))
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 3, self.ts + 3))
tdSql.query("select * from test")
tdSql.checkRows(4)
tdSql.query("select * from test where ts >= %d" % self.ts)
tdSql.checkRows(4)
tdSql.query("select * from test where ts > %d" % self.ts)
tdSql.checkRows(3)
tdSql.query("select * from test where ts = %d" % self.ts)
tdSql.checkRows(1)
tdSql.query("select * from test where start >= %d" % self.ts)
tdSql.checkRows(4)
tdSql.query("select * from test where start > %d" % self.ts)
tdSql.checkRows(3)
tdSql.query("select * from test where start = %d" % self.ts)
tdSql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -220,6 +220,7 @@ sql_error select sum(c3), ts, c2 from group_tb0 where c1 < 20 group by c1;
sql_error select sum(c3), first(ts), c2 from group_tb0 where c1 < 20 group by c1;
sql_error select first(c3), ts, c1, c2 from group_tb0 where c1 < 20 group by c1;
sql_error select first(c3), last(c3), ts, c1 from group_tb0 where c1 < 20 group by c1;
sql_error select ts from group_tb0 group by c1;
#===========================interval=====not support======================
sql_error select count(*), c1 from group_tb0 where c1<20 interval(1y) group by c1;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -80,6 +80,7 @@ function runSimCaseOneByOne {
fi
done < $1
}
function runSimCaseOneByOnefq {
start=`sed -n "/$1-start/=" jenkins/basic.txt`
@ -155,6 +156,7 @@ function runPyCaseOneByOne {
fi
done < $1
}
function runPyCaseOneByOnefq() {
cd $tests_dir/pytest
if [[ $1 =~ full ]] ; then
@ -202,13 +204,37 @@ function runPyCaseOneByOnefq() {
rm -rf ../../sim/case.log
}
######################
# main entry
######################
unameOut="$(uname -s)"
case "${unameOut}" in
Linux*) OS=Linux;;
Darwin*) OS=Darwin;;
CYGWIN*) OS=Windows;;
*) OS=Unknown;;
esac
case "${OS}" in
Linux*) TAOSLIB=libtaos.so;;
Darwin*) TAOSLIB=libtaos.dylib;;
Windows*) TAOSLIB=taos.dll;;
Unknown) TAOSLIB="UNKNOWN:${unameOut}";;
esac
echo TAOSLIB is ${TAOSLIB}
totalFailed=0
totalPyFailed=0
totalJDBCFailed=0
totalUnitFailed=0
totalExampleFailed=0
corepath=`grep -oP '.*(?=core_)' /proc/sys/kernel/core_pattern||grep -oP '.*(?=core-)' /proc/sys/kernel/core_pattern`
if [ "${OS}" == "Linux" ]; then
corepath=`grep -oP '.*(?=core_)' /proc/sys/kernel/core_pattern||grep -oP '.*(?=core-)' /proc/sys/kernel/core_pattern`
fi
if [ "$2" != "jdbc" ] && [ "$2" != "python" ] && [ "$2" != "unit" ] && [ "$2" != "example" ]; then
echo "### run TSIM test case ###"
cd $tests_dir/script
@ -290,11 +316,11 @@ if [ "$2" != "sim" ] && [ "$2" != "jdbc" ] && [ "$2" != "unit" ] && [ "$2" != "
fi
TOP_DIR=`pwd`
TAOSLIB_DIR=`find . -name "libtaos.so"|grep -w lib|head -n1`
TAOSLIB_DIR=`find . -name "${TAOSLIB}"|grep -w lib|head -n1`
if [[ "$TAOSLIB_DIR" == *"$IN_TDINTERNAL"* ]]; then
LIB_DIR=`find . -name "libtaos.so"|grep -w lib|head -n1|cut -d '/' --fields=2,3,4,5`
LIB_DIR=`find . -name "${TAOSLIB}"|grep -w lib|head -n1|cut -d '/' -f 2,3,4,5`
else
LIB_DIR=`find . -name "libtaos.so"|grep -w lib|head -n1|cut -d '/' --fields=2,3,4`
LIB_DIR=`find . -name "${TAOSLIB}"|grep -w lib|head -n1|cut -d '/' -f 2,3,4`
fi
export LD_LIBRARY_PATH=$TOP_DIR/$LIB_DIR:$LD_LIBRARY_PATH
@ -499,7 +525,9 @@ if [ "$2" != "sim" ] && [ "$2" != "python" ] && [ "$2" != "jdbc" ] && [ "$2" !=
echo -e "\n${RED} ### Total $totalExampleFailed examples failed! ### ${NC}"
fi
if [ "${OS}" == "Linux" ]; then
dohavecore 1
fi
fi