Merge pull request #5434 from taosdata/feature/qrefactor

Feature/qrefactor
This commit is contained in:
haojun Liao 2021-03-13 22:29:38 +08:00 committed by GitHub
commit 6b67d09665
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1612 additions and 1338 deletions

View File

@ -123,6 +123,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
@ -153,7 +154,6 @@ SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_F
SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t index);
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);

View File

@ -198,9 +198,10 @@ typedef struct STableDataBlocks {
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
STimeWindow window; // query time window
SInterval interval;
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SSqlGroupbyExpr groupbyExpr; // group by tags info
SArray * colList; // SArray<SColumn*>
@ -402,7 +403,6 @@ typedef struct SSqlStream {
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj);
void tscReleaseRpc(void *param);
void tscInitMsgsFp();

View File

@ -77,7 +77,7 @@ static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SC
static uint8_t convertOptr(SStrToken *pToken);
static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool intervalQuery);
static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool timeWindowQuery);
static bool validateIpAddress(const char* ip, size_t size);
static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
@ -86,8 +86,8 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
static int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd);
static int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t parseSlidingClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken);
static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding);
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem);
@ -128,7 +128,11 @@ static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index);
static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, int64_t *uid);
static bool validateDebugFlag(int32_t flag);
static bool validateDebugFlag(int32_t v);
static bool isTimeWindowQuery(SQueryInfo* pQueryInfo) {
return pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0;
}
int16_t getNewResColId(SQueryInfo* pQueryInfo) {
return pQueryInfo->resColumnId--;
@ -701,21 +705,86 @@ static bool isTopBottomQuery(SQueryInfo* pQueryInfo) {
return false;
}
int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
// need to add timestamp column in result set, if it is a time window query
static int32_t addPrimaryTsColumnForTimeWindowQuery(SQueryInfo* pQueryInfo) {
uint64_t uid = tscSqlExprGet(pQueryInfo, 0)->uid;
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
if (pTableMetaInfo->pTableMeta->id.uid == uid) {
tableIndex = i;
break;
}
}
if (tableIndex == COLUMN_INDEX_INITIAL_VAL) {
return TSDB_CODE_TSC_INVALID_SQL;
}
SSchema s = {.bytes = TSDB_KEYSIZE, .type = TSDB_DATA_TYPE_TIMESTAMP, .colId = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tstrncpy(s.name, aAggs[TSDB_FUNC_TS].name, sizeof(s.name));
SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS, &index, &s, TSDB_COL_NORMAL);
return TSDB_CODE_SUCCESS;
}
static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "invalid query expression";
const char* msg2 = "top/bottom query does not support order by value in time window query";
// for top/bottom + interval query, we do not add additional timestamp column in the front
if (isTopBottomQuery(pQueryInfo)) {
// invalid sql:
// top(col, k) from table_name [interval(1d)|session(ts, 1d)] order by k asc
// order by normal column is not supported
int32_t colId = pQueryInfo->order.orderColId;
if (isTimeWindowQuery(pQueryInfo) && colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
return TSDB_CODE_SUCCESS;
}
/*
* invalid sql:
* select count(tbname)/count(tag1)/count(tag2) from super_table_name [interval(1d)|session(ts, 1d)];
*/
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
}
/*
* invalid sql:
* select tbname, tags_fields from super_table_name [interval(1s)|session(ts,1s)]
*/
if (tscQueryTags(pQueryInfo) && isTimeWindowQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
return addPrimaryTsColumnForTimeWindowQuery(pQueryInfo);
}
int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
const char* msg2 = "interval cannot be less than 10 ms";
const char* msg3 = "sliding cannot be used without interval";
const char* msg4 = "top/bottom query does not support order by value in interval query";
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (pQuerySql->interval.type == 0 || pQuerySql->interval.n == 0) {
if (pQuerySql->interval.interval.type == 0 || pQuerySql->interval.interval.n == 0) {
if (pQuerySql->sliding.n > 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
return TSDB_CODE_SUCCESS;
}
@ -725,7 +794,7 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ
}
// interval is not null
SStrToken* t = &pQuerySql->interval;
SStrToken* t = &pQuerySql->interval.interval;
if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.interval, &pQueryInfo->interval.intervalUnit) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
@ -742,78 +811,64 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ
}
}
// for top/bottom + interval query, we do not add additional timestamp column in the front
if (isTopBottomQuery(pQueryInfo)) {
if (parseOffsetClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
if (parseIntervalOffset(pCmd, pQueryInfo, &pQuerySql->interval.offset) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if (parseSlidingClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
if (parseSlidingClause(pCmd, pQueryInfo, &pQuerySql->sliding) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
int32_t colId = pQueryInfo->order.orderColId;
if (pQueryInfo->interval.interval > 0 && colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
// The following part is used to check for the invalid query expression.
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
}
int32_t parseSessionClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL * pQuerySql) {
const char* msg1 = "gap should be fixed time window";
const char* msg2 = "only one type time window allowed";
const char* msg3 = "invalid column name";
const char* msg4 = "invalid time window";
// no session window
if (pQuerySql->sessionVal.gap.n == 0 || pQuerySql->sessionVal.col.n == 0) {
return TSDB_CODE_SUCCESS;
}
SStrToken* col = &pQuerySql->sessionVal.col;
SStrToken* gap = &pQuerySql->sessionVal.gap;
char timeUnit = 0;
if (parseNatualDuration(gap->z, gap->n, &pQueryInfo->sessionWindow.gap, &timeUnit) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
return TSDB_CODE_SUCCESS;
}
/*
* check invalid SQL:
* select count(tbname)/count(tag1)/count(tag2) from super_table_name interval(1d);
*/
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
}
/*
* check invalid SQL:
* select tbname, tags_fields from super_table_name interval(1s)
*/
if (tscQueryTags(pQueryInfo) && pQueryInfo->interval.interval > 0) {
if (timeUnit == 'y' || timeUnit == 'n') {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
// need to add timestamp column in result set, if interval is existed
uint64_t uid = tscSqlExprGet(pQueryInfo, 0)->uid;
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
if (pTableMetaInfo->pTableMeta->id.uid == uid) {
tableIndex = i;
break;
}
// if the unit of time window value is millisecond, change the value from microsecond
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
pQueryInfo->sessionWindow.gap = pQueryInfo->sessionWindow.gap / 1000;
}
if (tableIndex == COLUMN_INDEX_INITIAL_VAL) {
return TSDB_CODE_TSC_INVALID_SQL;
if (pQueryInfo->sessionWindow.gap != 0 && pQueryInfo->interval.interval != 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
SSchema s = {.bytes = TSDB_KEYSIZE, .type = TSDB_DATA_TYPE_TIMESTAMP, .colId = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tstrncpy(s.name, aAggs[TSDB_FUNC_TS].name, sizeof(s.name));
SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS, &index, &s, TSDB_COL_NORMAL);
if (parseOffsetClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (parseSlidingClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
pQueryInfo->sessionWindow.primaryColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
return TSDB_CODE_SUCCESS;
// The following part is used to check for the invalid query expression.
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
}
int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken) {
const char* msg1 = "interval offset cannot be negative";
const char* msg2 = "interval offset should be shorter than interval";
const char* msg3 = "cannot use 'year' as offset when interval is 'month'";
@ -821,7 +876,7 @@ int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQue
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
SStrToken* t = &pQuerySql->offset;
SStrToken* t = offsetToken;
if (t->n == 0) {
pQueryInfo->interval.offsetUnit = pQueryInfo->interval.intervalUnit;
pQueryInfo->interval.offset = 0;
@ -864,20 +919,17 @@ int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQue
return TSDB_CODE_SUCCESS;
}
int32_t parseSlidingClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding) {
const char* msg0 = "sliding value too small";
const char* msg1 = "sliding value no larger than the interval value";
const char* msg2 = "sliding value can not less than 1% of interval value";
const char* msg3 = "does not support sliding when interval is natural month/year";
// const char* msg4 = "sliding not support yet in ordinary query";
const static int32_t INTERVAL_SLIDING_FACTOR = 100;
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
SStrToken* pSliding = &pQuerySql->sliding;
if (pSliding->n == 0) {
pQueryInfo->interval.slidingUnit = pQueryInfo->interval.intervalUnit;
pQueryInfo->interval.sliding = pQueryInfo->interval.interval;
@ -969,7 +1021,7 @@ static bool validateTableColumnInfo(SArray* pFieldList, SSqlCmd* pCmd) {
const char* msg1 = "first column must be timestamp";
const char* msg2 = "row length exceeds max length";
const char* msg3 = "duplicated column names";
const char* msg4 = "invalid data types";
const char* msg4 = "invalid data type";
const char* msg5 = "invalid binary/nchar column length";
const char* msg6 = "invalid column name";
@ -990,14 +1042,13 @@ static bool validateTableColumnInfo(SArray* pFieldList, SSqlCmd* pCmd) {
int32_t nLen = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
pField = taosArrayGet(pFieldList, i);
if (pField->bytes == 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
if (!isValidDataType(pField->type)) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
return false;
}
if (!isValidDataType(pField->type)) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
if (pField->bytes == 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
return false;
}
@ -1190,7 +1241,7 @@ bool validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
const char* msg1 = "too many columns";
const char* msg2 = "duplicated column names";
const char* msg3 = "column length too long";
const char* msg4 = "invalid data types";
const char* msg4 = "invalid data type";
const char* msg5 = "invalid column name";
const char* msg6 = "invalid column length";
@ -1537,7 +1588,7 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) {
return false;
}
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool intervalQuery) {
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool timeWindowQuery) {
assert(pSelection != NULL && pCmd != NULL);
const char* msg2 = "functions or others can not be mixed up";
@ -1604,7 +1655,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
addPrimaryTsColIntoResult(pQueryInfo);
}
if (!functionCompatibleCheck(pQueryInfo, joinQuery, intervalQuery)) {
if (!functionCompatibleCheck(pQueryInfo, joinQuery, timeWindowQuery)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
@ -1872,22 +1923,18 @@ void setResultColName(char* name, tSqlExprItem* pItem, int32_t functionId, SStrT
}
}
static void updateLastQueryInfoForGroupby(SQueryInfo* pQueryInfo, STableMeta* pTableMeta, int32_t functionId, int32_t index) {
if (functionId != TSDB_FUNC_LAST) { // todo refactor
return;
static void updateLastScanOrderIfNeeded(SQueryInfo* pQueryInfo) {
if (pQueryInfo->sessionWindow.gap > 0 || tscGroupbyColumn(pQueryInfo)) {
size_t numOfExpr = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExpr; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId != TSDB_FUNC_LAST && pExpr->functionId != TSDB_FUNC_LAST_DST) {
continue;
}
SSqlGroupbyExpr* pGroupBy = &pQueryInfo->groupbyExpr;
if (pGroupBy->numOfGroupCols > 0) {
for(int32_t k = 0; k < pGroupBy->numOfGroupCols; ++k) {
SColIndex* pIndex = taosArrayGet(pGroupBy->columnInfo, k);
if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < tscGetNumOfColumns(pTableMeta)) { // group by normal columns
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, index);
pExpr->numOfParams = 1;
pExpr->param->i64 = TSDB_ORDER_ASC;
return;
}
pExpr->param->nType = TSDB_DATA_TYPE_INT;
}
}
}
@ -2156,8 +2203,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[j], cvtFunc, name, colIndex++, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex - 1);
}
} else {
@ -2181,8 +2226,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, name, colIndex + i, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex + i);
}
}
@ -2209,8 +2252,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[index.columnIndex], cvtFunc, name, colIndex, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex);
colIndex++;
}
@ -2845,23 +2886,23 @@ static bool groupbyTagsOrNull(SQueryInfo* pQueryInfo) {
return true;
}
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool intervalQuery) {
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery) {
int32_t startIdx = 0;
size_t numOfExpr = tscSqlExprNumOfExprs(pQueryInfo);
assert(numOfExpr > 0);
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, startIdx);
int32_t functionID = pExpr->functionId;
// ts function can be simultaneously used with any other functions.
int32_t functionID = pExpr->functionId;
if (functionID == TSDB_FUNC_TS || functionID == TSDB_FUNC_TS_DUMMY) {
startIdx++;
}
int32_t factor = functionCompatList[tscSqlExprGet(pQueryInfo, startIdx)->functionId];
if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_LAST_ROW && (joinQuery || intervalQuery || !groupbyTagsOrNull(pQueryInfo))) {
if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_LAST_ROW && (joinQuery || twQuery || !groupbyTagsOrNull(pQueryInfo))) {
return false;
}
@ -2889,7 +2930,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
}
}
if (functionId == TSDB_FUNC_LAST_ROW && (joinQuery || intervalQuery || !groupbyTagsOrNull(pQueryInfo))) {
if (functionId == TSDB_FUNC_LAST_ROW && (joinQuery || twQuery || !groupbyTagsOrNull(pQueryInfo))) {
return false;
}
}
@ -5919,8 +5960,8 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
SColumnList ids = getColumnList(1, 0, pColIndex->colIndex);
insertResultField(pQueryInfo, (int32_t)size, &ids, bytes, (int8_t)type, name, pExpr);
} else {
// if this query is "group by" normal column, interval is not allowed
if (pQueryInfo->interval.interval > 0) {
// if this query is "group by" normal column, time window query is not allowed
if (isTimeWindowQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
@ -5982,7 +6023,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || pQueryInfo->interval.interval > 0) {
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || isTimeWindowQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
} else {
return TSDB_CODE_SUCCESS;
@ -6557,8 +6598,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if ((pQueryInfo->interval.interval > 0) &&
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
@ -6790,10 +6830,10 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
}
int32_t joinQuery = (pQuerySql->from != NULL && taosArrayGetSize(pQuerySql->from) > 2);
int32_t timeWindowQuery = !(pQuerySql->interval.interval.type == 0 || pQuerySql->interval.interval.n == 0 ||
pQuerySql->sessionVal.gap.n == 0);
int32_t intervalQuery = !(pQuerySql->interval.type == 0 || pQuerySql->interval.n == 0);
if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable, joinQuery, intervalQuery) != TSDB_CODE_SUCCESS) {
if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable, joinQuery, timeWindowQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
@ -6806,12 +6846,16 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
if (parseIntervalClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
} else {
if ((pQueryInfo->interval.interval > 0) &&
if (isTimeWindowQuery(pQueryInfo) &&
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
if (parseSessionClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
// no result due to invalid query time range
if (pQueryInfo->window.skey > pQueryInfo->window.ekey) {
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
@ -6825,7 +6869,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
// in case of join query, time range is required.
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey);
if (timeRange == 0 && pQueryInfo->window.skey == 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
@ -6839,6 +6882,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return code;
}
updateLastScanOrderIfNeeded(pQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo);
if (pQuerySql->fillType != NULL) {

View File

@ -643,7 +643,6 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
}
pSql->epSet.inUse = rand()%pSql->epSet.numOfEps;
pQueryMsg->head.vgId = htonl(vgId);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
@ -658,8 +657,6 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(index >= 0 && index < numOfVgroups);
tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
// set the vgroup info
@ -669,6 +666,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables
tscDebug("%p query on stable, vgId:%d, numOfTables:%d, vgIndex:%d, numOfVgroups:%d", pSql,
pTableIdList->vgInfo.vgId, numOfTables, index, numOfVgroups);
// serialize each table id info
for(int32_t i = 0; i < numOfTables; ++i) {
STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
@ -754,6 +754,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
pQueryMsg->sqlstrLen = htonl(sqlLen);
pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen);
pQueryMsg->sw.gap = htobe64(pQueryInfo->sessionWindow.gap);
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number

View File

@ -1062,7 +1062,6 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
}

View File

@ -239,6 +239,21 @@ bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) {
return false;
}
bool tscGroupbyColumn(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
SSqlGroupbyExpr* pGroupbyExpr = &pQueryInfo->groupbyExpr;
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
SColIndex* pIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < numOfCols) { // group by normal columns
return true;
}
}
return false;
}
bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {

View File

@ -485,6 +485,7 @@ typedef struct {
int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode
SInterval interval;
SSessionWindow sw; // session window
uint16_t tagCondLen; // tag length in current query
uint32_t tbnameCondLen; // table name filter condition string length
int16_t numOfGroupCols; // num of group by columns

View File

@ -135,74 +135,75 @@
#define TK_FROM 116
#define TK_VARIABLE 117
#define TK_INTERVAL 118
#define TK_FILL 119
#define TK_SLIDING 120
#define TK_ORDER 121
#define TK_BY 122
#define TK_ASC 123
#define TK_DESC 124
#define TK_GROUP 125
#define TK_HAVING 126
#define TK_LIMIT 127
#define TK_OFFSET 128
#define TK_SLIMIT 129
#define TK_SOFFSET 130
#define TK_WHERE 131
#define TK_NOW 132
#define TK_RESET 133
#define TK_QUERY 134
#define TK_ADD 135
#define TK_COLUMN 136
#define TK_TAG 137
#define TK_CHANGE 138
#define TK_SET 139
#define TK_KILL 140
#define TK_CONNECTION 141
#define TK_STREAM 142
#define TK_COLON 143
#define TK_ABORT 144
#define TK_AFTER 145
#define TK_ATTACH 146
#define TK_BEFORE 147
#define TK_BEGIN 148
#define TK_CASCADE 149
#define TK_CLUSTER 150
#define TK_CONFLICT 151
#define TK_COPY 152
#define TK_DEFERRED 153
#define TK_DELIMITERS 154
#define TK_DETACH 155
#define TK_EACH 156
#define TK_END 157
#define TK_EXPLAIN 158
#define TK_FAIL 159
#define TK_FOR 160
#define TK_IGNORE 161
#define TK_IMMEDIATE 162
#define TK_INITIALLY 163
#define TK_INSTEAD 164
#define TK_MATCH 165
#define TK_KEY 166
#define TK_OF 167
#define TK_RAISE 168
#define TK_REPLACE 169
#define TK_RESTRICT 170
#define TK_ROW 171
#define TK_STATEMENT 172
#define TK_TRIGGER 173
#define TK_VIEW 174
#define TK_SEMI 175
#define TK_NONE 176
#define TK_PREV 177
#define TK_LINEAR 178
#define TK_IMPORT 179
#define TK_METRIC 180
#define TK_TBNAME 181
#define TK_JOIN 182
#define TK_METRICS 183
#define TK_INSERT 184
#define TK_INTO 185
#define TK_VALUES 186
#define TK_SESSION 119
#define TK_FILL 120
#define TK_SLIDING 121
#define TK_ORDER 122
#define TK_BY 123
#define TK_ASC 124
#define TK_DESC 125
#define TK_GROUP 126
#define TK_HAVING 127
#define TK_LIMIT 128
#define TK_OFFSET 129
#define TK_SLIMIT 130
#define TK_SOFFSET 131
#define TK_WHERE 132
#define TK_NOW 133
#define TK_RESET 134
#define TK_QUERY 135
#define TK_ADD 136
#define TK_COLUMN 137
#define TK_TAG 138
#define TK_CHANGE 139
#define TK_SET 140
#define TK_KILL 141
#define TK_CONNECTION 142
#define TK_STREAM 143
#define TK_COLON 144
#define TK_ABORT 145
#define TK_AFTER 146
#define TK_ATTACH 147
#define TK_BEFORE 148
#define TK_BEGIN 149
#define TK_CASCADE 150
#define TK_CLUSTER 151
#define TK_CONFLICT 152
#define TK_COPY 153
#define TK_DEFERRED 154
#define TK_DELIMITERS 155
#define TK_DETACH 156
#define TK_EACH 157
#define TK_END 158
#define TK_EXPLAIN 159
#define TK_FAIL 160
#define TK_FOR 161
#define TK_IGNORE 162
#define TK_IMMEDIATE 163
#define TK_INITIALLY 164
#define TK_INSTEAD 165
#define TK_MATCH 166
#define TK_KEY 167
#define TK_OF 168
#define TK_RAISE 169
#define TK_REPLACE 170
#define TK_RESTRICT 171
#define TK_ROW 172
#define TK_STATEMENT 173
#define TK_TRIGGER 174
#define TK_VIEW 175
#define TK_SEMI 176
#define TK_NONE 177
#define TK_PREV 178
#define TK_LINEAR 179
#define TK_IMPORT 180
#define TK_METRIC 181
#define TK_TBNAME 182
#define TK_JOIN 183
#define TK_METRICS 184
#define TK_INSERT 185
#define TK_INTO 186
#define TK_VALUES 187
#define TK_SPACE 300

View File

@ -72,6 +72,11 @@ typedef struct SInterval {
int64_t offset;
} SInterval;
typedef struct SSessionWindow {
int64_t gap; // gap between two session window(in microseconds)
int32_t primaryColId; // primary timestamp column
} SSessionWindow;
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);

View File

@ -185,7 +185,6 @@ typedef struct SQuery {
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query
int32_t interBufSize; // intermediate buffer sizse
@ -196,6 +195,7 @@ typedef struct SQuery {
STimeWindow window;
SInterval interval;
SSessionWindow sw;
int16_t precision;
int16_t numOfOutput;
int16_t fillType;
@ -279,10 +279,11 @@ enum OPERATOR_TYPE_E {
OP_Groupby = 8,
OP_Limit = 9,
OP_Offset = 10,
OP_TimeInterval = 11,
OP_Fill = 12,
OP_MultiTableAggregate = 13,
OP_MultiTableTimeInterval = 14,
OP_TimeWindow = 11,
OP_SessionWindow = 12,
OP_Fill = 13,
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
};
typedef struct SOperatorInfo {
@ -411,6 +412,14 @@ typedef struct SGroupbyOperatorInfo {
char *prevData; // previous group by value
} SGroupbyOperatorInfo;
typedef struct SSWindowOperatorInfo {
SOptrBasicInfo binfo;
STimeWindow curWindow; // current time window
TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows
int32_t start; // start row index
} SSWindowOperatorInfo;
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,

View File

@ -58,14 +58,19 @@ typedef struct SIntervalVal {
SStrToken offset;
} SIntervalVal;
typedef struct SSessionWindowVal {
SStrToken col;
SStrToken gap;
} SSessionWindowVal;
typedef struct SQuerySQL {
struct tSQLExprList *pSelection; // select clause
SArray * from; // from clause SArray<tVariantListItem>
struct tSQLExpr * pWhere; // where clause [optional]
SArray * pGroupby; // groupby clause, only for tags[optional], SArray<tVariantListItem>
SArray * pSortOrder; // orderby [optional], SArray<tVariantListItem>
SStrToken interval; // interval [optional]
SStrToken offset; // offset window [optional]
SIntervalVal interval; // (interval, interval_offset) [optional]
SSessionWindowVal sessionVal; // session window [optional]
SStrToken sliding; // sliding window [optional]
SLimitVal limit; // limit offset [optional]
SLimitVal slimit; // group limit offset [optional]
@ -256,8 +261,8 @@ tSQLExprList *tSqlExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SStrToken
void tSqlExprListDestroy(tSQLExprList *pList);
SQuerySQL *tSetQuerySqlElems(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SQuerySQL *tSetQuerySqlNode(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pGLimit);
SCreateTableSQL *tSetCreateSqlElems(SArray *pCols, SArray *pTags, SQuerySQL *pSelect, int32_t type);

View File

@ -452,8 +452,8 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
//////////////////////// The SELECT statement /////////////////////////////////
%type select {SQuerySQL*}
%destructor select {doDestroyQuerySql($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlElems(&T, W, X, Y, P, Z, &K, &S, F, &L, &G);
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G);
}
%type union {SSubclauseInfo*}
@ -471,7 +471,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); }
// select server_version(), select client_version(),
// select server_state();
select(A) ::= SELECT(T) selcollist(W). {
A = tSetQuerySqlElems(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
}
// selcollist is a list of expressions that are to become the return
@ -546,13 +546,21 @@ tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z) ids(F). {
tmvar(A) ::= VARIABLE(X). {A = X;}
%type interval_opt {SIntervalVal}
interval_opt(N) ::= INTERVAL LP tmvar(E) RP. {N.interval = E; N.offset.n = 0; N.offset.z = NULL; N.offset.type = 0;}
interval_opt(N) ::= INTERVAL LP tmvar(E) COMMA tmvar(O) RP. {N.interval = E; N.offset = O;}
interval_opt(N) ::= INTERVAL LP tmvar(E) RP. {N.interval = E; N.offset.n = 0;}
interval_opt(N) ::= INTERVAL LP tmvar(E) COMMA tmvar(X) RP. {N.interval = E; N.offset = X;}
interval_opt(N) ::= . {memset(&N, 0, sizeof(N));}
%type session_option {SSessionWindowVal}
session_option(X) ::= . {X.col.n = 0; X.gap.n = 0;}
session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. {
V.n += Z.n;
X.col = V;
X.gap = Y;
}
%type fill_opt {SArray*}
%destructor fill_opt {taosArrayDestroy($$);}
fill_opt(N) ::= . {N = 0; }
fill_opt(N) ::= . { N = 0; }
fill_opt(N) ::= FILL LP ID(Y) COMMA tagitemlist(X) RP. {
tVariant A = {0};
toTSDBType(Y.type);

View File

@ -171,6 +171,7 @@ static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
@ -1254,8 +1255,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows,
numOfOutput);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput);
}
// restore current time window
@ -1268,8 +1268,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// window start key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows,
numOfOutput);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
STimeWindow nextWin = win;
while (1) {
@ -1287,13 +1286,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep =
getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
}
if (pQuery->timeWindowInterpo) {
@ -1323,6 +1320,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
continue;
}
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (pInfo->prevData == NULL || (memcmp(pInfo->prevData, val, bytes) != 0)) {
if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes);
@ -1347,6 +1345,66 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
}
}
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->pQuery->current;
// primary timestamp column
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SOptrBasicInfo* pBInfo = &pInfo->binfo;
int64_t gap = pOperator->pRuntimeEnv->pQuery->sw.gap;
pInfo->numOfRows = 0;
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
if (pInfo->prevTs == INT64_MIN) {
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j];
pInfo->numOfRows = 1;
pInfo->start = j;
} else if (tsList[j] - pInfo->prevTs <= gap) {
pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j];
pInfo->numOfRows += 1;
pInfo->start = j;
} else { // start a new session window
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j];
pInfo->numOfRows = 1;
pInfo->start = j;
}
}
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
}
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
int64_t v = -1;
GET_TYPED_DATA(v, int64_t, type, pData);
@ -1700,6 +1758,13 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
} else if (pQuery->sw.gap > 0) {
pRuntimeEnv->proot = createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
@ -2482,7 +2547,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block.
if (pQuery->numOfFilterCols > 0 || pQuery->groupbyColumn ||
if (pQuery->numOfFilterCols > 0 || pQuery->groupbyColumn || pQuery->sw.gap > 0 ||
(QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, &pBlock->info))) {
(*status) = BLK_DATA_ALL_NEEDED;
}
@ -3039,7 +3104,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t numOfOutput = pOperator->numOfOutput;
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery) || pQuery->sw.gap > 0) {
// for each group result, call the finalize function for each column
if (pQuery->groupbyColumn) {
closeAllResultRows(pResultRowInfo);
@ -4240,7 +4305,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pCtx = pAggInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_TimeInterval) {
} else if (pDownstream->operatorType == OP_TimeWindow) {
STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info;
pTableScanInfo->pCtx = pIntervalInfo->pCtx;
@ -4264,6 +4329,12 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} else if (pDownstream->operatorType == OP_Arithmetic) {
SArithOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_SessionWindow) {
SSWindowOperatorInfo* pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
@ -4619,6 +4690,62 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
return pIntervalInfo->pRes;
}
static SSDataBlock* doSessionWindowAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SSWindowOperatorInfo* pWindowInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
STimeWindow win = pQuery->window;
SOperatorInfo* upstream = pOperator->upstream;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
if (pBlock == NULL) {
break;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQuery->order.order);
doSessionWindowAggImpl(pOperator, pWindowInfo, pBlock);
}
// restore the value
pQuery->order.order = order;
pQuery->window = win;
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* hashGroupbyAggregate(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
@ -4908,7 +5035,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = OP_TimeInterval;
pOperator->operatorType = OP_TimeWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
@ -4922,6 +5049,31 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
return pOperator;
}
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->prevTs = INT64_MIN;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = OP_SessionWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSessionWindowAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator;
}
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
@ -5203,6 +5355,17 @@ static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) {
return false;
}
if (pQueryMsg->sw.gap < 0 || pQueryMsg->sw.primaryColId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
qError("qmsg:%p illegal value of session window time %" PRId64, pQueryMsg, pQueryMsg->sw.gap);
return false;
}
if (pQueryMsg->sw.gap > 0 && pQueryMsg->interval.interval > 0) {
qError("qmsg:%p illegal value of session window time %" PRId64" and interval value %"PRId64, pQueryMsg,
pQueryMsg->sw.gap, pQueryMsg->interval.interval);
return false;
}
if (pQueryMsg->numOfTables <= 0) {
qError("qmsg:%p illegal value of numOfTables %d", pQueryMsg, pQueryMsg->numOfTables);
return false;
@ -5315,6 +5478,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput);
pQueryMsg->sqlstrLen = htonl(pQueryMsg->sqlstrLen);
pQueryMsg->prevResultLen = htonl(pQueryMsg->prevResultLen);
pQueryMsg->sw.gap = htobe64(pQueryMsg->sw.gap);
pQueryMsg->sw.primaryColId = htonl(pQueryMsg->sw.primaryColId);
// query msg safety check
if (!validateQueryMsg(pQueryMsg)) {
@ -5953,7 +6118,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
pQuery->tagColList = pTagCols;
pQuery->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit;
pQuery->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX;
pQuery->sw = pQueryMsg->sw;
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) {
goto _cleanup;
@ -6023,7 +6188,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
pQuery->queryWindowIdentical = true;
bool groupByCol = isGroupbyColumn(pQuery->pGroupbyExpr);
STimeWindow window = pQuery->window;
@ -6042,11 +6206,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
for(int32_t j = 0; j < s; ++j) {
STableKeyInfo* info = taosArrayGet(pa, j);
window.skey = info->lastKey;
if (info->lastKey != pQuery->window.skey) {
pQInfo->query.queryWindowIdentical = false;
}
void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
STableQueryInfo* item = createTableQueryInfo(pQuery, info->pTable, groupByCol, window, buf);

View File

@ -550,8 +550,8 @@ void tSqlSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
/*
* extract the select info out of sql string
*/
SQuerySQL *tSetQuerySqlElems(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SQuerySQL *tSetQuerySqlNode(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pGLimit) {
assert(pSelection != NULL);
@ -574,14 +574,17 @@ SQuerySQL *tSetQuerySqlElems(SStrToken *pSelectToken, tSQLExprList *pSelection,
}
if (pInterval != NULL) {
pQuery->interval = pInterval->interval;
pQuery->offset = pInterval->offset;
pQuery->interval = *pInterval;
}
if (pSliding != NULL) {
pQuery->sliding = *pSliding;
}
if (pSession != NULL) {
pQuery->sessionVal = *pSession;
}
pQuery->fillType = pFill;
return pQuery;
}

View File

@ -139,6 +139,7 @@ static SKeyword keywordTable[] = {
{"FROM", TK_FROM},
{"VARIABLE", TK_VARIABLE},
{"INTERVAL", TK_INTERVAL},
{"SESSION", TK_SESSION},
{"FILL", TK_FILL},
{"SLIDING", TK_SLIDING},
{"ORDER", TK_ORDER},

View File

@ -42,12 +42,11 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
}
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) {
pResultRowInfo->capacity = size;
pResultRowInfo->type = type;
pResultRowInfo->curIndex = -1;
pResultRowInfo->size = 0;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
pResultRowInfo->curIndex = -1;
pResultRowInfo->capacity = size;
pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES);
if (pResultRowInfo->pResult == NULL) {

File diff suppressed because it is too large Load Diff

View File

@ -372,7 +372,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// kill current query and free corresponding resources.
if (pRetrieve->free == 1) {
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);