Merge remote-tracking branch 'upstream/develop' into odbc2

This commit is contained in:
freemine 2021-03-14 09:34:02 +08:00
commit 332a9f05a0
26 changed files with 1883 additions and 1341 deletions

3
Jenkinsfile vendored
View File

@ -46,6 +46,7 @@ def pre_test(){
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|//src//connector|Jenkinsfile' || exit 0
find ${WKC}/tests/pytest -name \'*\'.sql -exec rm -rf {} \\;
cd ${WK}
git reset --hard HEAD~10
git checkout develop
@ -115,7 +116,6 @@ pipeline {
sh '''
date
cd ${WKC}/tests
find pytest -name '*'sql|xargs rm -rf
./test-all.sh p1
date'''
}
@ -131,7 +131,6 @@ pipeline {
sh '''
date
cd ${WKC}/tests
find pytest -name '*'sql|xargs rm -rf
./test-all.sh p2
date'''
}

View File

@ -123,6 +123,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
@ -153,7 +154,6 @@ SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_F
SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t index);
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);

View File

@ -198,9 +198,10 @@ typedef struct STableDataBlocks {
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
STimeWindow window; // query time window
SInterval interval;
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SSqlGroupbyExpr groupbyExpr; // group by tags info
SArray * colList; // SArray<SColumn*>
@ -232,6 +233,7 @@ typedef struct SQueryInfo {
typedef struct {
int command;
uint8_t msgType;
char reserve1[3]; // fix bus error on arm32
bool autoCreated; // create table if it is not existed during retrieve table meta in mnode
union {
@ -244,8 +246,10 @@ typedef struct {
char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished;
char reserve2[3]; // fix bus error on arm32
int16_t numOfCols;
char reserve3[2]; // fix bus error on arm32
uint32_t allocSize;
char * payload;
int32_t payloadLen;
@ -255,7 +259,9 @@ typedef struct {
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
char reserve4[3]; // fix bus error on arm32
int8_t submitSchema; // submit block is built with table schema
char reserve5[3]; // fix bus error on arm32
STagData tagData; // NOTE: pTagData->data is used as a variant length array
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
@ -397,7 +403,6 @@ typedef struct SSqlStream {
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj);
void tscReleaseRpc(void *param);
void tscInitMsgsFp();

View File

@ -307,7 +307,8 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload,
return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
}
*((float *)payload) = (float)dv;
// *((float *)payload) = (float)dv;
SET_FLOAT_VAL(payload, dv);
}
break;

View File

@ -77,7 +77,7 @@ static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SC
static uint8_t convertOptr(SStrToken *pToken);
static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool intervalQuery);
static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool timeWindowQuery);
static bool validateIpAddress(const char* ip, size_t size);
static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
@ -86,8 +86,8 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
static int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd);
static int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t parseSlidingClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken);
static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding);
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem);
@ -128,7 +128,11 @@ static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index);
static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, int64_t *uid);
static bool validateDebugFlag(int32_t flag);
static bool validateDebugFlag(int32_t v);
static bool isTimeWindowQuery(SQueryInfo* pQueryInfo) {
return pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0;
}
int16_t getNewResColId(SQueryInfo* pQueryInfo) {
return pQueryInfo->resColumnId--;
@ -701,21 +705,86 @@ static bool isTopBottomQuery(SQueryInfo* pQueryInfo) {
return false;
}
int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
// need to add timestamp column in result set, if it is a time window query
static int32_t addPrimaryTsColumnForTimeWindowQuery(SQueryInfo* pQueryInfo) {
uint64_t uid = tscSqlExprGet(pQueryInfo, 0)->uid;
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
if (pTableMetaInfo->pTableMeta->id.uid == uid) {
tableIndex = i;
break;
}
}
if (tableIndex == COLUMN_INDEX_INITIAL_VAL) {
return TSDB_CODE_TSC_INVALID_SQL;
}
SSchema s = {.bytes = TSDB_KEYSIZE, .type = TSDB_DATA_TYPE_TIMESTAMP, .colId = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tstrncpy(s.name, aAggs[TSDB_FUNC_TS].name, sizeof(s.name));
SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS, &index, &s, TSDB_COL_NORMAL);
return TSDB_CODE_SUCCESS;
}
static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "invalid query expression";
const char* msg2 = "top/bottom query does not support order by value in time window query";
// for top/bottom + interval query, we do not add additional timestamp column in the front
if (isTopBottomQuery(pQueryInfo)) {
// invalid sql:
// top(col, k) from table_name [interval(1d)|session(ts, 1d)] order by k asc
// order by normal column is not supported
int32_t colId = pQueryInfo->order.orderColId;
if (isTimeWindowQuery(pQueryInfo) && colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
return TSDB_CODE_SUCCESS;
}
/*
* invalid sql:
* select count(tbname)/count(tag1)/count(tag2) from super_table_name [interval(1d)|session(ts, 1d)];
*/
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
}
/*
* invalid sql:
* select tbname, tags_fields from super_table_name [interval(1s)|session(ts,1s)]
*/
if (tscQueryTags(pQueryInfo) && isTimeWindowQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
return addPrimaryTsColumnForTimeWindowQuery(pQueryInfo);
}
int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
const char* msg2 = "interval cannot be less than 10 ms";
const char* msg3 = "sliding cannot be used without interval";
const char* msg4 = "top/bottom query does not support order by value in interval query";
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (pQuerySql->interval.type == 0 || pQuerySql->interval.n == 0) {
if (pQuerySql->interval.interval.type == 0 || pQuerySql->interval.interval.n == 0) {
if (pQuerySql->sliding.n > 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
return TSDB_CODE_SUCCESS;
}
@ -725,7 +794,7 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ
}
// interval is not null
SStrToken* t = &pQuerySql->interval;
SStrToken* t = &pQuerySql->interval.interval;
if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.interval, &pQueryInfo->interval.intervalUnit) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
@ -742,78 +811,64 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ
}
}
// for top/bottom + interval query, we do not add additional timestamp column in the front
if (isTopBottomQuery(pQueryInfo)) {
if (parseOffsetClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if (parseIntervalOffset(pCmd, pQueryInfo, &pQuerySql->interval.offset) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if (parseSlidingClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if (parseSlidingClause(pCmd, pQueryInfo, &pQuerySql->sliding) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
int32_t colId = pQueryInfo->order.orderColId;
if (pQueryInfo->interval.interval > 0 && colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
// The following part is used to check for the invalid query expression.
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
}
int32_t parseSessionClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL * pQuerySql) {
const char* msg1 = "gap should be fixed time window";
const char* msg2 = "only one type time window allowed";
const char* msg3 = "invalid column name";
const char* msg4 = "invalid time window";
// no session window
if (pQuerySql->sessionVal.gap.n == 0 || pQuerySql->sessionVal.col.n == 0) {
return TSDB_CODE_SUCCESS;
}
/*
* check invalid SQL:
* select count(tbname)/count(tag1)/count(tag2) from super_table_name interval(1d);
*/
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
SStrToken* col = &pQuerySql->sessionVal.col;
SStrToken* gap = &pQuerySql->sessionVal.gap;
char timeUnit = 0;
if (parseNatualDuration(gap->z, gap->n, &pQueryInfo->sessionWindow.gap, &timeUnit) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
/*
* check invalid SQL:
* select tbname, tags_fields from super_table_name interval(1s)
*/
if (tscQueryTags(pQueryInfo) && pQueryInfo->interval.interval > 0) {
if (timeUnit == 'y' || timeUnit == 'n') {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
// need to add timestamp column in result set, if interval is existed
uint64_t uid = tscSqlExprGet(pQueryInfo, 0)->uid;
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
if (pTableMetaInfo->pTableMeta->id.uid == uid) {
tableIndex = i;
break;
}
// if the unit of time window value is millisecond, change the value from microsecond
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
pQueryInfo->sessionWindow.gap = pQueryInfo->sessionWindow.gap / 1000;
}
if (tableIndex == COLUMN_INDEX_INITIAL_VAL) {
return TSDB_CODE_TSC_INVALID_SQL;
if (pQueryInfo->sessionWindow.gap != 0 && pQueryInfo->interval.interval != 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
SSchema s = {.bytes = TSDB_KEYSIZE, .type = TSDB_DATA_TYPE_TIMESTAMP, .colId = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tstrncpy(s.name, aAggs[TSDB_FUNC_TS].name, sizeof(s.name));
SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS, &index, &s, TSDB_COL_NORMAL);
if (parseOffsetClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (parseSlidingClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
pQueryInfo->sessionWindow.primaryColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
return TSDB_CODE_SUCCESS;
// The following part is used to check for the invalid query expression.
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
}
int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken) {
const char* msg1 = "interval offset cannot be negative";
const char* msg2 = "interval offset should be shorter than interval";
const char* msg3 = "cannot use 'year' as offset when interval is 'month'";
@ -821,7 +876,7 @@ int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQue
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
SStrToken* t = &pQuerySql->offset;
SStrToken* t = offsetToken;
if (t->n == 0) {
pQueryInfo->interval.offsetUnit = pQueryInfo->interval.intervalUnit;
pQueryInfo->interval.offset = 0;
@ -864,20 +919,17 @@ int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQue
return TSDB_CODE_SUCCESS;
}
int32_t parseSlidingClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding) {
const char* msg0 = "sliding value too small";
const char* msg1 = "sliding value no larger than the interval value";
const char* msg2 = "sliding value can not less than 1% of interval value";
const char* msg3 = "does not support sliding when interval is natural month/year";
// const char* msg4 = "sliding not support yet in ordinary query";
const static int32_t INTERVAL_SLIDING_FACTOR = 100;
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
SStrToken* pSliding = &pQuerySql->sliding;
if (pSliding->n == 0) {
pQueryInfo->interval.slidingUnit = pQueryInfo->interval.intervalUnit;
pQueryInfo->interval.sliding = pQueryInfo->interval.interval;
@ -969,7 +1021,7 @@ static bool validateTableColumnInfo(SArray* pFieldList, SSqlCmd* pCmd) {
const char* msg1 = "first column must be timestamp";
const char* msg2 = "row length exceeds max length";
const char* msg3 = "duplicated column names";
const char* msg4 = "invalid data types";
const char* msg4 = "invalid data type";
const char* msg5 = "invalid binary/nchar column length";
const char* msg6 = "invalid column name";
@ -990,14 +1042,13 @@ static bool validateTableColumnInfo(SArray* pFieldList, SSqlCmd* pCmd) {
int32_t nLen = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
pField = taosArrayGet(pFieldList, i);
if (pField->bytes == 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
if (!isValidDataType(pField->type)) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
return false;
}
if (!isValidDataType(pField->type)) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
if (pField->bytes == 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
return false;
}
@ -1190,7 +1241,7 @@ bool validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
const char* msg1 = "too many columns";
const char* msg2 = "duplicated column names";
const char* msg3 = "column length too long";
const char* msg4 = "invalid data types";
const char* msg4 = "invalid data type";
const char* msg5 = "invalid column name";
const char* msg6 = "invalid column length";
@ -1537,7 +1588,7 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) {
return false;
}
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool intervalQuery) {
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool timeWindowQuery) {
assert(pSelection != NULL && pCmd != NULL);
const char* msg2 = "functions or others can not be mixed up";
@ -1604,7 +1655,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
addPrimaryTsColIntoResult(pQueryInfo);
}
if (!functionCompatibleCheck(pQueryInfo, joinQuery, intervalQuery)) {
if (!functionCompatibleCheck(pQueryInfo, joinQuery, timeWindowQuery)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
@ -1872,22 +1923,18 @@ void setResultColName(char* name, tSqlExprItem* pItem, int32_t functionId, SStrT
}
}
static void updateLastQueryInfoForGroupby(SQueryInfo* pQueryInfo, STableMeta* pTableMeta, int32_t functionId, int32_t index) {
if (functionId != TSDB_FUNC_LAST) { // todo refactor
return;
}
SSqlGroupbyExpr* pGroupBy = &pQueryInfo->groupbyExpr;
if (pGroupBy->numOfGroupCols > 0) {
for(int32_t k = 0; k < pGroupBy->numOfGroupCols; ++k) {
SColIndex* pIndex = taosArrayGet(pGroupBy->columnInfo, k);
if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < tscGetNumOfColumns(pTableMeta)) { // group by normal columns
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, index);
pExpr->numOfParams = 1;
pExpr->param->i64 = TSDB_ORDER_ASC;
return;
static void updateLastScanOrderIfNeeded(SQueryInfo* pQueryInfo) {
if (pQueryInfo->sessionWindow.gap > 0 || tscGroupbyColumn(pQueryInfo)) {
size_t numOfExpr = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExpr; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId != TSDB_FUNC_LAST && pExpr->functionId != TSDB_FUNC_LAST_DST) {
continue;
}
pExpr->numOfParams = 1;
pExpr->param->i64 = TSDB_ORDER_ASC;
pExpr->param->nType = TSDB_DATA_TYPE_INT;
}
}
}
@ -2156,8 +2203,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[j], cvtFunc, name, colIndex++, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex - 1);
}
} else {
@ -2181,8 +2226,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, name, colIndex + i, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex + i);
}
}
@ -2209,8 +2252,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[index.columnIndex], cvtFunc, name, colIndex, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex);
colIndex++;
}
@ -2845,23 +2886,23 @@ static bool groupbyTagsOrNull(SQueryInfo* pQueryInfo) {
return true;
}
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool intervalQuery) {
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery) {
int32_t startIdx = 0;
size_t numOfExpr = tscSqlExprNumOfExprs(pQueryInfo);
assert(numOfExpr > 0);
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, startIdx);
int32_t functionID = pExpr->functionId;
// ts function can be simultaneously used with any other functions.
int32_t functionID = pExpr->functionId;
if (functionID == TSDB_FUNC_TS || functionID == TSDB_FUNC_TS_DUMMY) {
startIdx++;
}
int32_t factor = functionCompatList[tscSqlExprGet(pQueryInfo, startIdx)->functionId];
if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_LAST_ROW && (joinQuery || intervalQuery || !groupbyTagsOrNull(pQueryInfo))) {
if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_LAST_ROW && (joinQuery || twQuery || !groupbyTagsOrNull(pQueryInfo))) {
return false;
}
@ -2889,7 +2930,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
}
}
if (functionId == TSDB_FUNC_LAST_ROW && (joinQuery || intervalQuery || !groupbyTagsOrNull(pQueryInfo))) {
if (functionId == TSDB_FUNC_LAST_ROW && (joinQuery || twQuery || !groupbyTagsOrNull(pQueryInfo))) {
return false;
}
}
@ -5919,8 +5960,8 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
SColumnList ids = getColumnList(1, 0, pColIndex->colIndex);
insertResultField(pQueryInfo, (int32_t)size, &ids, bytes, (int8_t)type, name, pExpr);
} else {
// if this query is "group by" normal column, interval is not allowed
if (pQueryInfo->interval.interval > 0) {
// if this query is "group by" normal column, time window query is not allowed
if (isTimeWindowQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
@ -5982,7 +6023,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || pQueryInfo->interval.interval > 0) {
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || isTimeWindowQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
} else {
return TSDB_CODE_SUCCESS;
@ -6557,8 +6598,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if ((pQueryInfo->interval.interval > 0) &&
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
@ -6790,10 +6830,10 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
}
int32_t joinQuery = (pQuerySql->from != NULL && taosArrayGetSize(pQuerySql->from) > 2);
int32_t timeWindowQuery = !(pQuerySql->interval.interval.type == 0 || pQuerySql->interval.interval.n == 0 ||
pQuerySql->sessionVal.gap.n == 0);
int32_t intervalQuery = !(pQuerySql->interval.type == 0 || pQuerySql->interval.n == 0);
if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable, joinQuery, intervalQuery) != TSDB_CODE_SUCCESS) {
if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable, joinQuery, timeWindowQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
@ -6806,12 +6846,16 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
if (parseIntervalClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
} else {
if ((pQueryInfo->interval.interval > 0) &&
if (isTimeWindowQuery(pQueryInfo) &&
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
if (parseSessionClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
// no result due to invalid query time range
if (pQueryInfo->window.skey > pQueryInfo->window.ekey) {
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
@ -6825,7 +6869,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
// in case of join query, time range is required.
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey);
if (timeRange == 0 && pQueryInfo->window.skey == 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
@ -6839,6 +6882,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return code;
}
updateLastScanOrderIfNeeded(pQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo);
if (pQuerySql->fillType != NULL) {

View File

@ -643,7 +643,6 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
}
pSql->epSet.inUse = rand()%pSql->epSet.numOfEps;
pQueryMsg->head.vgId = htonl(vgId);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
@ -658,8 +657,6 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(index >= 0 && index < numOfVgroups);
tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
// set the vgroup info
@ -668,7 +665,10 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables
tscDebug("%p query on stable, vgId:%d, numOfTables:%d, vgIndex:%d, numOfVgroups:%d", pSql,
pTableIdList->vgInfo.vgId, numOfTables, index, numOfVgroups);
// serialize each table id info
for(int32_t i = 0; i < numOfTables; ++i) {
STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
@ -754,6 +754,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
pQueryMsg->sqlstrLen = htonl(sqlLen);
pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen);
pQueryMsg->sw.gap = htobe64(pQueryInfo->sessionWindow.gap);
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number

View File

@ -1062,7 +1062,6 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
}

View File

@ -239,6 +239,21 @@ bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) {
return false;
}
bool tscGroupbyColumn(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
SSqlGroupbyExpr* pGroupbyExpr = &pQueryInfo->groupbyExpr;
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
SColIndex* pIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < numOfCols) { // group by normal columns
return true;
}
}
return false;
}
bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {

View File

@ -485,12 +485,13 @@ typedef struct {
int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode
SInterval interval;
SSessionWindow sw; // session window
uint16_t tagCondLen; // tag length in current query
uint32_t tbnameCondLen; // table name filter condition string length
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query.
int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query.
int16_t prjOrder; // global order in super table projection query.
int64_t limit;
int64_t offset;
@ -628,6 +629,7 @@ typedef struct {
int32_t maxtablesPerVnode;
int32_t maxVgroupsPerDb;
char arbitrator[TSDB_EP_LEN]; // tsArbitrator
char reserve[2]; // to solve arm32 bus error
char timezone[64]; // tsTimezone
int64_t checkTime; // 1970-01-01 00:00:00.000
char locale[TSDB_LOCALE_LEN]; // tsLocale

View File

@ -135,74 +135,75 @@
#define TK_FROM 116
#define TK_VARIABLE 117
#define TK_INTERVAL 118
#define TK_FILL 119
#define TK_SLIDING 120
#define TK_ORDER 121
#define TK_BY 122
#define TK_ASC 123
#define TK_DESC 124
#define TK_GROUP 125
#define TK_HAVING 126
#define TK_LIMIT 127
#define TK_OFFSET 128
#define TK_SLIMIT 129
#define TK_SOFFSET 130
#define TK_WHERE 131
#define TK_NOW 132
#define TK_RESET 133
#define TK_QUERY 134
#define TK_ADD 135
#define TK_COLUMN 136
#define TK_TAG 137
#define TK_CHANGE 138
#define TK_SET 139
#define TK_KILL 140
#define TK_CONNECTION 141
#define TK_STREAM 142
#define TK_COLON 143
#define TK_ABORT 144
#define TK_AFTER 145
#define TK_ATTACH 146
#define TK_BEFORE 147
#define TK_BEGIN 148
#define TK_CASCADE 149
#define TK_CLUSTER 150
#define TK_CONFLICT 151
#define TK_COPY 152
#define TK_DEFERRED 153
#define TK_DELIMITERS 154
#define TK_DETACH 155
#define TK_EACH 156
#define TK_END 157
#define TK_EXPLAIN 158
#define TK_FAIL 159
#define TK_FOR 160
#define TK_IGNORE 161
#define TK_IMMEDIATE 162
#define TK_INITIALLY 163
#define TK_INSTEAD 164
#define TK_MATCH 165
#define TK_KEY 166
#define TK_OF 167
#define TK_RAISE 168
#define TK_REPLACE 169
#define TK_RESTRICT 170
#define TK_ROW 171
#define TK_STATEMENT 172
#define TK_TRIGGER 173
#define TK_VIEW 174
#define TK_SEMI 175
#define TK_NONE 176
#define TK_PREV 177
#define TK_LINEAR 178
#define TK_IMPORT 179
#define TK_METRIC 180
#define TK_TBNAME 181
#define TK_JOIN 182
#define TK_METRICS 183
#define TK_INSERT 184
#define TK_INTO 185
#define TK_VALUES 186
#define TK_SESSION 119
#define TK_FILL 120
#define TK_SLIDING 121
#define TK_ORDER 122
#define TK_BY 123
#define TK_ASC 124
#define TK_DESC 125
#define TK_GROUP 126
#define TK_HAVING 127
#define TK_LIMIT 128
#define TK_OFFSET 129
#define TK_SLIMIT 130
#define TK_SOFFSET 131
#define TK_WHERE 132
#define TK_NOW 133
#define TK_RESET 134
#define TK_QUERY 135
#define TK_ADD 136
#define TK_COLUMN 137
#define TK_TAG 138
#define TK_CHANGE 139
#define TK_SET 140
#define TK_KILL 141
#define TK_CONNECTION 142
#define TK_STREAM 143
#define TK_COLON 144
#define TK_ABORT 145
#define TK_AFTER 146
#define TK_ATTACH 147
#define TK_BEFORE 148
#define TK_BEGIN 149
#define TK_CASCADE 150
#define TK_CLUSTER 151
#define TK_CONFLICT 152
#define TK_COPY 153
#define TK_DEFERRED 154
#define TK_DELIMITERS 155
#define TK_DETACH 156
#define TK_EACH 157
#define TK_END 158
#define TK_EXPLAIN 159
#define TK_FAIL 160
#define TK_FOR 161
#define TK_IGNORE 162
#define TK_IMMEDIATE 163
#define TK_INITIALLY 164
#define TK_INSTEAD 165
#define TK_MATCH 166
#define TK_KEY 167
#define TK_OF 168
#define TK_RAISE 169
#define TK_REPLACE 170
#define TK_RESTRICT 171
#define TK_ROW 172
#define TK_STATEMENT 173
#define TK_TRIGGER 174
#define TK_VIEW 175
#define TK_SEMI 176
#define TK_NONE 177
#define TK_PREV 178
#define TK_LINEAR 179
#define TK_IMPORT 180
#define TK_METRIC 181
#define TK_TBNAME 182
#define TK_JOIN 183
#define TK_METRICS 184
#define TK_INSERT 185
#define TK_INTO 186
#define TK_VALUES 187
#define TK_SPACE 300

View File

@ -72,6 +72,11 @@ typedef struct SInterval {
int64_t offset;
} SInterval;
typedef struct SSessionWindow {
int64_t gap; // gap between two session window(in microseconds)
int32_t primaryColId; // primary timestamp column
} SSessionWindow;
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);

View File

@ -63,9 +63,11 @@ void httpJsonString(JsonBuf* buf, char* sVal, int32_t len);
void httpJsonOriginString(JsonBuf* buf, char* sVal, int32_t len);
void httpJsonStringForTransMean(JsonBuf* buf, char* SVal, int32_t maxLen);
void httpJsonInt64(JsonBuf* buf, int64_t num);
void httpJsonUInt64(JsonBuf* buf, uint64_t num);
void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us);
void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us);
void httpJsonInt(JsonBuf* buf, int32_t num);
void httpJsonUInt(JsonBuf* buf, uint32_t num);
void httpJsonFloat(JsonBuf* buf, float num);
void httpJsonDouble(JsonBuf* buf, double num);
void httpJsonNull(JsonBuf* buf);

View File

@ -256,6 +256,12 @@ void httpJsonInt64(JsonBuf* buf, int64_t num) {
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%" PRId64, num);
}
void httpJsonUInt64(JsonBuf* buf, uint64_t num) {
httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%" PRIu64, num);
}
void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us) {
char ts[35] = {0};
struct tm* ptm;
@ -303,6 +309,12 @@ void httpJsonInt(JsonBuf* buf, int32_t num) {
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%d", num);
}
void httpJsonUInt(JsonBuf* buf, uint32_t num) {
httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%u", num);
}
void httpJsonFloat(JsonBuf* buf, float num) {
httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ);

View File

@ -162,6 +162,18 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
case TSDB_DATA_TYPE_BIGINT:
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_UTINYINT:
httpJsonUInt(jsonBuf, *((uint8_t *)row[i]));
break;
case TSDB_DATA_TYPE_USMALLINT:
httpJsonUInt(jsonBuf, *((uint16_t *)row[i]));
break;
case TSDB_DATA_TYPE_UINT:
httpJsonUInt(jsonBuf, *((uint32_t *)row[i]));
break;
case TSDB_DATA_TYPE_UBIGINT:
httpJsonUInt64(jsonBuf, *((uint64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
httpJsonFloat(jsonBuf, GET_FLOAT_VAL(row[i]));
break;

View File

@ -185,7 +185,6 @@ typedef struct SQuery {
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query
int32_t interBufSize; // intermediate buffer sizse
@ -196,6 +195,7 @@ typedef struct SQuery {
STimeWindow window;
SInterval interval;
SSessionWindow sw;
int16_t precision;
int16_t numOfOutput;
int16_t fillType;
@ -279,10 +279,11 @@ enum OPERATOR_TYPE_E {
OP_Groupby = 8,
OP_Limit = 9,
OP_Offset = 10,
OP_TimeInterval = 11,
OP_Fill = 12,
OP_MultiTableAggregate = 13,
OP_MultiTableTimeInterval = 14,
OP_TimeWindow = 11,
OP_SessionWindow = 12,
OP_Fill = 13,
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
};
typedef struct SOperatorInfo {
@ -411,6 +412,14 @@ typedef struct SGroupbyOperatorInfo {
char *prevData; // previous group by value
} SGroupbyOperatorInfo;
typedef struct SSWindowOperatorInfo {
SOptrBasicInfo binfo;
STimeWindow curWindow; // current time window
TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows
int32_t start; // start row index
} SSWindowOperatorInfo;
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,

View File

@ -58,14 +58,19 @@ typedef struct SIntervalVal {
SStrToken offset;
} SIntervalVal;
typedef struct SSessionWindowVal {
SStrToken col;
SStrToken gap;
} SSessionWindowVal;
typedef struct SQuerySQL {
struct tSQLExprList *pSelection; // select clause
SArray * from; // from clause SArray<tVariantListItem>
struct tSQLExpr * pWhere; // where clause [optional]
SArray * pGroupby; // groupby clause, only for tags[optional], SArray<tVariantListItem>
SArray * pSortOrder; // orderby [optional], SArray<tVariantListItem>
SStrToken interval; // interval [optional]
SStrToken offset; // offset window [optional]
SIntervalVal interval; // (interval, interval_offset) [optional]
SSessionWindowVal sessionVal; // session window [optional]
SStrToken sliding; // sliding window [optional]
SLimitVal limit; // limit offset [optional]
SLimitVal slimit; // group limit offset [optional]
@ -256,8 +261,8 @@ tSQLExprList *tSqlExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SStrToken
void tSqlExprListDestroy(tSQLExprList *pList);
SQuerySQL *tSetQuerySqlElems(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SQuerySQL *tSetQuerySqlNode(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pGLimit);
SCreateTableSQL *tSetCreateSqlElems(SArray *pCols, SArray *pTags, SQuerySQL *pSelect, int32_t type);

View File

@ -326,8 +326,8 @@ signed(A) ::= PLUS INTEGER(X). { A = strtol(X.z, NULL, 10); }
signed(A) ::= MINUS INTEGER(X). { A = -strtol(X.z, NULL, 10);}
////////////////////////////////// The CREATE TABLE statement ///////////////////////////////
cmd ::= CREATE TABLE create_table_args. {}
cmd ::= CREATE TABLE create_stable_args. {}
cmd ::= CREATE TABLE create_table_args. {}
cmd ::= CREATE TABLE create_stable_args. {}
cmd ::= CREATE STABLE create_stable_args. {}
cmd ::= CREATE TABLE create_table_list(Z). { pInfo->type = TSDB_SQL_CREATE_TABLE; pInfo->pCreateTableInfo = Z;}
@ -452,8 +452,8 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
//////////////////////// The SELECT statement /////////////////////////////////
%type select {SQuerySQL*}
%destructor select {doDestroyQuerySql($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlElems(&T, W, X, Y, P, Z, &K, &S, F, &L, &G);
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G);
}
%type union {SSubclauseInfo*}
@ -471,7 +471,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); }
// select server_version(), select client_version(),
// select server_state();
select(A) ::= SELECT(T) selcollist(W). {
A = tSetQuerySqlElems(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
}
// selcollist is a list of expressions that are to become the return
@ -546,13 +546,21 @@ tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z) ids(F). {
tmvar(A) ::= VARIABLE(X). {A = X;}
%type interval_opt {SIntervalVal}
interval_opt(N) ::= INTERVAL LP tmvar(E) RP. {N.interval = E; N.offset.n = 0; N.offset.z = NULL; N.offset.type = 0;}
interval_opt(N) ::= INTERVAL LP tmvar(E) COMMA tmvar(O) RP. {N.interval = E; N.offset = O;}
interval_opt(N) ::= INTERVAL LP tmvar(E) RP. {N.interval = E; N.offset.n = 0;}
interval_opt(N) ::= INTERVAL LP tmvar(E) COMMA tmvar(X) RP. {N.interval = E; N.offset = X;}
interval_opt(N) ::= . {memset(&N, 0, sizeof(N));}
%type session_option {SSessionWindowVal}
session_option(X) ::= . {X.col.n = 0; X.gap.n = 0;}
session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. {
V.n += Z.n;
X.col = V;
X.gap = Y;
}
%type fill_opt {SArray*}
%destructor fill_opt {taosArrayDestroy($$);}
fill_opt(N) ::= . {N = 0; }
fill_opt(N) ::= . { N = 0; }
fill_opt(N) ::= FILL LP ID(Y) COMMA tagitemlist(X) RP. {
tVariant A = {0};
toTSDBType(Y.type);

View File

@ -171,6 +171,7 @@ static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
@ -1254,8 +1255,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows,
numOfOutput);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput);
}
// restore current time window
@ -1268,8 +1268,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// window start key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows,
numOfOutput);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
STimeWindow nextWin = win;
while (1) {
@ -1287,13 +1286,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep =
getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
}
if (pQuery->timeWindowInterpo) {
@ -1323,6 +1320,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
continue;
}
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (pInfo->prevData == NULL || (memcmp(pInfo->prevData, val, bytes) != 0)) {
if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes);
@ -1347,6 +1345,66 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
}
}
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->pQuery->current;
// primary timestamp column
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SOptrBasicInfo* pBInfo = &pInfo->binfo;
int64_t gap = pOperator->pRuntimeEnv->pQuery->sw.gap;
pInfo->numOfRows = 0;
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
if (pInfo->prevTs == INT64_MIN) {
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j];
pInfo->numOfRows = 1;
pInfo->start = j;
} else if (tsList[j] - pInfo->prevTs <= gap) {
pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j];
pInfo->numOfRows += 1;
pInfo->start = j;
} else { // start a new session window
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j];
pInfo->numOfRows = 1;
pInfo->start = j;
}
}
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
}
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
int64_t v = -1;
GET_TYPED_DATA(v, int64_t, type, pData);
@ -1700,6 +1758,13 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
} else if (pQuery->sw.gap > 0) {
pRuntimeEnv->proot = createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
@ -2482,7 +2547,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block.
if (pQuery->numOfFilterCols > 0 || pQuery->groupbyColumn ||
if (pQuery->numOfFilterCols > 0 || pQuery->groupbyColumn || pQuery->sw.gap > 0 ||
(QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, &pBlock->info))) {
(*status) = BLK_DATA_ALL_NEEDED;
}
@ -3039,7 +3104,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t numOfOutput = pOperator->numOfOutput;
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery) || pQuery->sw.gap > 0) {
// for each group result, call the finalize function for each column
if (pQuery->groupbyColumn) {
closeAllResultRows(pResultRowInfo);
@ -4240,7 +4305,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pCtx = pAggInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_TimeInterval) {
} else if (pDownstream->operatorType == OP_TimeWindow) {
STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info;
pTableScanInfo->pCtx = pIntervalInfo->pCtx;
@ -4264,6 +4329,12 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} else if (pDownstream->operatorType == OP_Arithmetic) {
SArithOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_SessionWindow) {
SSWindowOperatorInfo* pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
@ -4619,6 +4690,62 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
return pIntervalInfo->pRes;
}
static SSDataBlock* doSessionWindowAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SSWindowOperatorInfo* pWindowInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
STimeWindow win = pQuery->window;
SOperatorInfo* upstream = pOperator->upstream;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
if (pBlock == NULL) {
break;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQuery->order.order);
doSessionWindowAggImpl(pOperator, pWindowInfo, pBlock);
}
// restore the value
pQuery->order.order = order;
pQuery->window = win;
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* hashGroupbyAggregate(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
@ -4908,7 +5035,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = OP_TimeInterval;
pOperator->operatorType = OP_TimeWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
@ -4922,6 +5049,31 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
return pOperator;
}
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->prevTs = INT64_MIN;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = OP_SessionWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSessionWindowAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator;
}
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
@ -4971,7 +5123,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
}
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput) {
int32_t numOfOutput) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
@ -5203,6 +5355,17 @@ static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) {
return false;
}
if (pQueryMsg->sw.gap < 0 || pQueryMsg->sw.primaryColId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
qError("qmsg:%p illegal value of session window time %" PRId64, pQueryMsg, pQueryMsg->sw.gap);
return false;
}
if (pQueryMsg->sw.gap > 0 && pQueryMsg->interval.interval > 0) {
qError("qmsg:%p illegal value of session window time %" PRId64" and interval value %"PRId64, pQueryMsg,
pQueryMsg->sw.gap, pQueryMsg->interval.interval);
return false;
}
if (pQueryMsg->numOfTables <= 0) {
qError("qmsg:%p illegal value of numOfTables %d", pQueryMsg, pQueryMsg->numOfTables);
return false;
@ -5315,6 +5478,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput);
pQueryMsg->sqlstrLen = htonl(pQueryMsg->sqlstrLen);
pQueryMsg->prevResultLen = htonl(pQueryMsg->prevResultLen);
pQueryMsg->sw.gap = htobe64(pQueryMsg->sw.gap);
pQueryMsg->sw.primaryColId = htonl(pQueryMsg->sw.primaryColId);
// query msg safety check
if (!validateQueryMsg(pQueryMsg)) {
@ -5953,7 +6118,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
pQuery->tagColList = pTagCols;
pQuery->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit;
pQuery->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX;
pQuery->sw = pQueryMsg->sw;
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) {
goto _cleanup;
@ -6023,7 +6188,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
pQuery->queryWindowIdentical = true;
bool groupByCol = isGroupbyColumn(pQuery->pGroupbyExpr);
STimeWindow window = pQuery->window;
@ -6042,11 +6206,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
for(int32_t j = 0; j < s; ++j) {
STableKeyInfo* info = taosArrayGet(pa, j);
window.skey = info->lastKey;
if (info->lastKey != pQuery->window.skey) {
pQInfo->query.queryWindowIdentical = false;
}
void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
STableQueryInfo* item = createTableQueryInfo(pQuery, info->pTable, groupByCol, window, buf);

View File

@ -550,8 +550,8 @@ void tSqlSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
/*
* extract the select info out of sql string
*/
SQuerySQL *tSetQuerySqlElems(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SQuerySQL *tSetQuerySqlNode(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pGLimit) {
assert(pSelection != NULL);
@ -574,14 +574,17 @@ SQuerySQL *tSetQuerySqlElems(SStrToken *pSelectToken, tSQLExprList *pSelection,
}
if (pInterval != NULL) {
pQuery->interval = pInterval->interval;
pQuery->offset = pInterval->offset;
pQuery->interval = *pInterval;
}
if (pSliding != NULL) {
pQuery->sliding = *pSliding;
}
if (pSession != NULL) {
pQuery->sessionVal = *pSession;
}
pQuery->fillType = pFill;
return pQuery;
}

View File

@ -139,6 +139,7 @@ static SKeyword keywordTable[] = {
{"FROM", TK_FROM},
{"VARIABLE", TK_VARIABLE},
{"INTERVAL", TK_INTERVAL},
{"SESSION", TK_SESSION},
{"FILL", TK_FILL},
{"SLIDING", TK_SLIDING},
{"ORDER", TK_ORDER},

View File

@ -42,12 +42,11 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
}
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) {
pResultRowInfo->capacity = size;
pResultRowInfo->type = type;
pResultRowInfo->curIndex = -1;
pResultRowInfo->type = type;
pResultRowInfo->size = 0;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
pResultRowInfo->curIndex = -1;
pResultRowInfo->capacity = size;
pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES);
if (pResultRowInfo->pResult == NULL) {

File diff suppressed because it is too large Load Diff

View File

@ -372,7 +372,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// kill current query and free corresponding resources.
if (pRetrieve->free == 1) {
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);

View File

@ -183,6 +183,7 @@ python3 ./test.py -f query/isNullTest.py
python3 ./test.py -f query/queryWithTaosdKilled.py
python3 ./test.py -f query/floatCompare.py
python3 ./test.py -f query/queryGroupbySort.py
python3 ./test.py -f query/queryBetweenAnd.py
#stream
python3 ./test.py -f stream/metric_1.py

View File

@ -26,6 +26,8 @@ class TDTestCase:
self.rowNum = 10
self.ts = 1537146000000
self.stb_prefix = 's'
self.subtb_prefix = 't'
def run(self):
tdSql.prepare()
@ -85,6 +87,33 @@ class TDTestCase:
tdSql.query("select stddev(col6) from test1")
tdSql.checkData(0, 0, np.std(floatData))
#add for td-3276
sql="create table s (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool,c8 binary(20),c9 nchar(20),c11 int unsigned,c12 smallint unsigned,c13 tinyint unsigned,c14 bigint unsigned) \
tags(t1 int, t2 float, t3 bigint, t4 smallint, t5 tinyint, t6 double, t7 bool,t8 binary(20),t9 nchar(20), t10 int unsigned , t11 smallint unsigned , t12 tinyint unsigned , t13 bigint unsigned)"
tdSql.execute(sql)
for j in range(2):
if j % 2 == 0:
sql = "create table %s using %s tags(NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)" % \
(self.subtb_prefix+str(j)+'_'+str(j),self.stb_prefix)
else:
sql = "create table %s using %s tags(%d,%d,%d,%d,%d,%d,%d,'%s','%s',%d,%d,%d,%d)" % \
(self.subtb_prefix+str(j)+'_'+str(j),self.stb_prefix,j,j/2.0,j%41,j%51,j%53,j*1.0,j%2,'taos'+str(j),'涛思'+str(j), j%43, j%23 , j%17 , j%3167)
tdSql.execute(sql)
for i in range(10):
if i % 5 == 0 :
ret = tdSql.execute(
"insert into %s values (%d , NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)" %
(self.subtb_prefix+str(j)+'_'+str(j), self.ts+i))
else:
ret = tdSql.execute(
"insert into %s values (%d , %d,%d,%d,%d,%d,%d,%d,'%s','%s',%d,%d,%d,%d)" %
(self.subtb_prefix+str(j)+'_'+str(j), self.ts+i, i%100, i/2.0, i%41, i%51, i%53, i*1.0, i%2,'taos'+str(i),'涛思'+str(i), i%43, i%23 , i%17 , i%3167))
for i in range(13):
tdSql.query('select stddev(c4) from s group by t%s' % str(i+1) )
def stop(self):
tdSql.close()

View File

@ -0,0 +1,206 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import taos
import sys
from util.log import *
from util.sql import *
from util.cases import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table")
tdSql.execute(
'''create table if not exists supt
(ts timestamp, c1 int, c2 float, c3 bigint, c4 double, c5 smallint, c6 tinyint)
tags(location binary(64), type int, isused bool , family nchar(64))'''
)
tdSql.execute("create table t1 using supt tags('beijing', 1, 1, '自行车')")
tdSql.execute("create table t2 using supt tags('shanghai', 2, 0, '拖拉机')")
tdLog.printNoPrefix("==========step2:insert data")
for i in range(10):
tdSql.execute(
f"insert into t1 values (now+{i}m, {32767+i}, {20.0+i/10}, {2**31+i}, {3.4*10**38+i/10}, {127+i}, {i})"
)
tdSql.execute(
f"insert into t2 values (now-{i}m, {-32767-i}, {20.0-i/10}, {-i-2**31}, {-i/10-3.4*10**38}, {-127-i}, {-i})"
)
tdSql.execute(
f"insert into t1 values (now+11m, {2**31-1}, {pow(10,37)*34}, {pow(2,63)-1}, {1.7*10**308}, 32767, 127)"
)
tdSql.execute(
f"insert into t2 values (now-11m, {1-2**31}, {-3.4*10**38}, {1-2**63}, {-1.7*10**308}, -32767, -127)"
)
tdSql.execute(
f"insert into t2 values (now-12m, null , {-3.4*10**38}, null , {-1.7*10**308}, null , null)"
)
tdLog.printNoPrefix("==========step3:query timestamp type")
tdSql.query("select * from t1 where ts between now-1m and now+10m")
tdSql.checkRows(10)
tdSql.query("select * from t1 where ts between '2021-01-01 00:00:00.000' and '2121-01-01 00:00:00.000'")
tdSql.checkRows(11)
tdSql.query("select * from t1 where ts between '1969-01-01 00:00:00.000' and '1969-12-31 23:59:59.999'")
tdSql.checkRows(0)
tdSql.query("select * from t1 where ts between -2793600 and 31507199")
tdSql.checkRows(0)
tdSql.query("select * from t1 where ts between 1609430400000 and 4765104000000")
tdSql.checkRows(11)
tdLog.printNoPrefix("==========step4:query int type")
tdSql.query("select * from t1 where c1 between 32767 and 32776")
tdSql.checkRows(10)
tdSql.query("select * from t1 where c1 between 32766.9 and 32776.1")
tdSql.checkRows(10)
tdSql.query("select * from t1 where c1 between 32776 and 32767")
tdSql.checkRows(0)
tdSql.error("select * from t1 where c1 between 'a' and 'e'")
# tdSql.query("select * from t1 where c1 between 0x64 and 0x69")
# tdSql.checkRows(6)
tdSql.error("select * from t1 where c1 not between 100 and 106")
tdSql.query(f"select * from t1 where c1 between {2**31-2} and {2**31+1}")
tdSql.checkRows(1)
tdSql.error(f"select * from t2 where c1 between null and {1-2**31}")
# tdSql.checkRows(3)
tdSql.query(f"select * from t2 where c1 between {-2**31} and {1-2**31}")
tdSql.checkRows(1)
tdLog.printNoPrefix("==========step5:query float type")
tdSql.query("select * from t1 where c2 between 20.0 and 21.0")
tdSql.checkRows(10)
tdSql.query(f"select * from t1 where c2 between {-3.4*10**38-1} and {3.4*10**38+1}")
tdSql.checkRows(11)
tdSql.query("select * from t1 where c2 between 21.0 and 20.0")
tdSql.checkRows(0)
tdSql.error("select * from t1 where c2 between 'DC3' and 'SYN'")
tdSql.error("select * from t1 where c2 not between 0.1 and 0.2")
# tdSql.query(f"select * from t1 where c2 between {pow(10,38)*3.4} and {pow(10,38)*3.4+1}")
# tdSql.checkRows(1)
tdSql.query(f"select * from t2 where c2 between {-3.4*10**38-1} and {-3.4*10**38}")
tdSql.checkRows(0)
tdSql.error(f"select * from t2 where c2 between null and {-3.4*10**38}")
# tdSql.checkRows(3)
tdLog.printNoPrefix("==========step6:query bigint type")
tdSql.query(f"select * from t1 where c3 between {2**31} and {2**31+10}")
tdSql.checkRows(10)
tdSql.error(f"select * from t1 where c3 between {-2**63} and {2**63}")
# tdSql.checkRows(11)
tdSql.query(f"select * from t1 where c3 between {2**31+10} and {2**31}")
tdSql.checkRows(0)
tdSql.error("select * from t1 where c3 between 'a' and 'z'")
tdSql.error("select * from t1 where c3 not between 1 and 2")
tdSql.query(f"select * from t1 where c3 between {2**63-2} and {2**63-1}")
tdSql.checkRows(1)
tdSql.error(f"select * from t2 where c3 between {-2**63} and {1-2**63}")
# tdSql.checkRows(3)
tdSql.error(f"select * from t2 where c3 between null and {1-2**63}")
# tdSql.checkRows(2)
tdLog.printNoPrefix("==========step7:query double type")
tdSql.query(f"select * from t1 where c4 between {3.4*10**38} and {3.4*10**38+10}")
tdSql.checkRows(10)
tdSql.query(f"select * from t1 where c4 between {1.7*10**308+1} and {1.7*10**308+2}")
# 因为精度原因在超出bigint边界后数值不能进行准确的判断
# tdSql.checkRows(0)
tdSql.query(f"select * from t1 where c4 between {3.4*10**38+10} and {3.4*10**38}")
# tdSql.checkRows(0)
tdSql.error("select * from t1 where c4 between 'a' and 'z'")
tdSql.error("select * from t1 where c4 not between 1 and 2")
tdSql.query(f"select * from t1 where c4 between {1.7*10**308} and {1.7*10**308+1}")
tdSql.checkRows(1)
tdSql.query(f"select * from t2 where c4 between {-1.7*10**308-1} and {-1.7*10**308}")
# tdSql.checkRows(3)
tdSql.error(f"select * from t2 where c4 between null and {-1.7*10**308}")
# tdSql.checkRows(3)
tdLog.printNoPrefix("==========step8:query smallint type")
tdSql.query("select * from t1 where c5 between 127 and 136")
tdSql.checkRows(10)
tdSql.query("select * from t1 where c5 between 126.9 and 135.9")
tdSql.checkRows(9)
tdSql.query("select * from t1 where c5 between 136 and 127")
tdSql.checkRows(0)
tdSql.error("select * from t1 where c5 between '~' and 'ˆ'")
tdSql.error("select * from t1 where c5 not between 1 and 2")
tdSql.query("select * from t1 where c5 between 32767 and 32768")
tdSql.checkRows(1)
tdSql.query("select * from t2 where c5 between -32768 and -32767")
tdSql.checkRows(1)
tdSql.error("select * from t2 where c5 between null and -32767")
# tdSql.checkRows(1)
tdLog.printNoPrefix("==========step9:query tinyint type")
tdSql.query("select * from t1 where c6 between 0 and 9")
tdSql.checkRows(10)
tdSql.query("select * from t1 where c6 between -1.1 and 8.9")
tdSql.checkRows(9)
tdSql.query("select * from t1 where c6 between 9 and 0")
tdSql.checkRows(0)
tdSql.error("select * from t1 where c6 between 'NUL' and 'HT'")
tdSql.error("select * from t1 where c6 not between 1 and 2")
tdSql.query("select * from t1 where c6 between 127 and 128")
tdSql.checkRows(1)
tdSql.query("select * from t2 where c6 between -128 and -127")
tdSql.checkRows(1)
tdSql.error("select * from t2 where c6 between null and -127")
# tdSql.checkRows(3)
tdLog.printNoPrefix("==========step10:invalid query type")
tdSql.query("select * from supt where location between 'beijing' and 'shanghai'")
tdSql.checkRows(23)
# 非0值均解析为1因此"between 负值 and o"解析为"between 1 and 0"
tdSql.query("select * from supt where isused between 0 and 1")
tdSql.checkRows(23)
tdSql.query("select * from supt where isused between -1 and 0")
tdSql.checkRows(0)
tdSql.error("select * from supt where isused between false and true")
tdSql.query("select * from supt where family between '拖拉机' and '自行车'")
tdSql.checkRows(23)
tdLog.printNoPrefix("==========step11:query HEX/OCT/BIN type")
tdSql.error("select * from t1 where c6 between 0x7f and 0x80") # check filter HEX
tdSql.error("select * from t1 where c6 between 0b1 and 0b11111") # check filter BIN
tdSql.error("select * from t1 where c6 between 0b1 and 0x80")
tdSql.error("select * from t1 where c6=0b1")
tdSql.error("select * from t1 where c6=0x1")
# 八进制数据会按照十进制数据进行判定
tdSql.query("select * from t1 where c6 between 01 and 0200") # check filter OCT
tdSql.checkRows(10)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())