td-1245: refactor interval/sliding/offset to one struct
This commit is contained in:
parent
4456951e66
commit
76d407395f
|
@ -69,6 +69,7 @@ typedef struct SJoinSupporter {
|
|||
SSubqueryState* pState;
|
||||
SSqlObj* pObj; // parent SqlObj
|
||||
int32_t subqueryIndex; // index of sub query
|
||||
SInterval interval;
|
||||
SLimitVal limit; // limit info
|
||||
uint64_t uid; // query table uid
|
||||
SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution
|
||||
|
|
|
@ -226,13 +226,8 @@ typedef struct SQueryInfo {
|
|||
int16_t command; // the command may be different for each subclause, so keep it seperately.
|
||||
uint32_t type; // query/insert type
|
||||
// TODO refactor
|
||||
char intervalTimeUnit;
|
||||
char slidingTimeUnit;
|
||||
char offsetTimeUnit;
|
||||
STimeWindow window; // query time window
|
||||
int64_t intervalTime; // aggregation time window range
|
||||
int64_t slidingTime; // sliding window in mseconds
|
||||
int64_t offsetTime; // start offset of each time window
|
||||
SInterval interval;
|
||||
int32_t tz; // query client timezone
|
||||
|
||||
SSqlGroupbyExpr groupbyExpr; // group by tags info
|
||||
|
|
|
@ -368,13 +368,12 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
|
|||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
|
||||
TSKEY stime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey : pQueryInfo->window.ekey;
|
||||
int64_t revisedSTime =
|
||||
taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
|
||||
int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision);
|
||||
|
||||
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
|
||||
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
|
||||
pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
|
||||
4096, (int32_t)numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit,
|
||||
4096, (int32_t)numOfCols, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit,
|
||||
tinfo.precision, pQueryInfo->fillType, pFillCol);
|
||||
}
|
||||
}
|
||||
|
@ -551,7 +550,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
|
|||
}
|
||||
|
||||
// primary timestamp column is involved in final result
|
||||
if (pQueryInfo->intervalTime != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
numOfGroupByCols++;
|
||||
}
|
||||
|
||||
|
@ -568,7 +567,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
|
|||
orderIdx[i] = startCols++;
|
||||
}
|
||||
|
||||
if (pQueryInfo->intervalTime != 0) {
|
||||
if (pQueryInfo->interval.interval != 0) {
|
||||
// the first column is the timestamp, handles queries like "interval(10m) group by tags"
|
||||
orderIdx[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX;
|
||||
}
|
||||
|
@ -612,12 +611,12 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
|
|||
* super table interval query
|
||||
* if the order columns is the primary timestamp, all result data belongs to one group
|
||||
*/
|
||||
assert(pQueryInfo->intervalTime > 0);
|
||||
assert(pQueryInfo->interval.interval > 0);
|
||||
if (numOfCols == 1) {
|
||||
return true;
|
||||
}
|
||||
} else { // simple group by query
|
||||
assert(pQueryInfo->intervalTime == 0);
|
||||
assert(pQueryInfo->interval.interval == 0);
|
||||
}
|
||||
|
||||
// only one row exists
|
||||
|
@ -825,8 +824,7 @@ void savePrevRecordAndSetupFillInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQ
|
|||
|
||||
if (pFillInfo != NULL) {
|
||||
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
|
||||
int64_t revisedSTime =
|
||||
taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
|
||||
int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision);
|
||||
|
||||
taosResetFillInfo(pFillInfo, revisedSTime);
|
||||
}
|
||||
|
@ -839,7 +837,7 @@ void savePrevRecordAndSetupFillInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQ
|
|||
}
|
||||
|
||||
static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, SQueryInfo* pQueryInfo) {
|
||||
assert(pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE);
|
||||
assert(pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE);
|
||||
|
||||
tFilePage * pBeforeFillData = pLocalReducer->pResultBuf;
|
||||
|
||||
|
@ -1220,7 +1218,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
|
|||
#endif
|
||||
|
||||
// no interval query, no fill operation
|
||||
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
|
||||
if (pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
|
||||
genFinalResWithoutFill(pRes, pLocalReducer, pQueryInfo);
|
||||
} else {
|
||||
SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
|
||||
|
@ -1258,13 +1256,10 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
|
|||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
|
||||
int8_t precision = tinfo.precision;
|
||||
|
||||
// for group result interpolation, do not return if not data is generated
|
||||
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
|
||||
TSKEY skey = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey:pQueryInfo->window.ekey;//MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
|
||||
int64_t newTime =
|
||||
taosGetIntervalStartTimestamp(skey, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
|
||||
int64_t newTime = taosTimeTruncate(skey, &pQueryInfo->interval, tinfo.precision);
|
||||
taosResetFillInfo(pLocalReducer->pFillInfo, newTime);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -596,18 +596,18 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
|
|||
|
||||
// interval is not null
|
||||
SStrToken* t = &pQuerySql->interval;
|
||||
if (parseDuration(t->z, t->n, &pQueryInfo->intervalTime, &pQueryInfo->intervalTimeUnit) != TSDB_CODE_SUCCESS) {
|
||||
if (parseDuration(t->z, t->n, &pQueryInfo->interval.interval, &pQueryInfo->interval.intervalUnit) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
if (pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y') {
|
||||
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') {
|
||||
// if the unit of time window value is millisecond, change the value from microsecond
|
||||
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->intervalTime = pQueryInfo->intervalTime / 1000;
|
||||
pQueryInfo->interval.interval = pQueryInfo->interval.interval / 1000;
|
||||
}
|
||||
|
||||
// interval cannot be less than 10 milliseconds
|
||||
if (pQueryInfo->intervalTime < tsMinIntervalTime) {
|
||||
if (pQueryInfo->interval.interval < tsMinIntervalTime) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
}
|
||||
|
@ -641,7 +641,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
|
|||
* check invalid SQL:
|
||||
* select tbname, tags_fields from super_table_name interval(1s)
|
||||
*/
|
||||
if (tscQueryTags(pQueryInfo) && pQueryInfo->intervalTime > 0) {
|
||||
if (tscQueryTags(pQueryInfo) && pQueryInfo->interval.interval > 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
|
@ -679,34 +679,52 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
|
|||
}
|
||||
|
||||
int32_t parseOffsetClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
|
||||
const char* msg1 = "interval offset cannot be negative";
|
||||
const char* msg2 = "interval offset should be shorter than interval";
|
||||
const char* msg3 = "cannot use 'year' as offset when interval is 'month'";
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
|
||||
SStrToken* pOffset = &pQuerySql->offset;
|
||||
if (pOffset->n == 0) {
|
||||
pQueryInfo->offsetTimeUnit = pQueryInfo->offsetTimeUnit;
|
||||
pQueryInfo->offsetTime = 0;
|
||||
SStrToken* t = &pQuerySql->offset;
|
||||
if (t->n == 0) {
|
||||
pQueryInfo->interval.offsetUnit = pQueryInfo->interval.intervalUnit;
|
||||
pQueryInfo->interval.offset = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
getTimestampInUsFromStr(pOffset->z, pOffset->n, &pQueryInfo->offsetTime);
|
||||
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->offsetTime /= 1000;
|
||||
if (parseDuration(t->z, t->n, &pQueryInfo->interval.offset, &pQueryInfo->interval.offsetUnit) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
/*
|
||||
if (pQueryInfo->offsetTime < 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0);
|
||||
}
|
||||
|
||||
if (pQueryInfo->slidingTime >= pQueryInfo->intervalTime) {
|
||||
if (pQueryInfo->interval.offset < 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
if ((pQueryInfo->intervalTime != 0) && (pQueryInfo->intervalTime/pQueryInfo->slidingTime > INTERVAL_SLIDING_FACTOR)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
if (pQueryInfo->interval.offsetUnit != 'n' && pQueryInfo->interval.offsetUnit != 'y') {
|
||||
// if the unit of time window value is millisecond, change the value from microsecond
|
||||
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->interval.offset = pQueryInfo->interval.offset / 1000;
|
||||
}
|
||||
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') {
|
||||
if (pQueryInfo->interval.offset >= pQueryInfo->interval.interval) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
}
|
||||
} else if (pQueryInfo->interval.offsetUnit == pQueryInfo->interval.intervalUnit) {
|
||||
if (pQueryInfo->interval.offset >= pQueryInfo->interval.interval) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
} else if (pQueryInfo->interval.intervalUnit == 'n' && pQueryInfo->interval.offsetUnit == 'y') {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
} else if (pQueryInfo->interval.intervalUnit == 'y' && pQueryInfo->interval.offsetUnit == 'n') {
|
||||
if (pQueryInfo->interval.interval * 12 <= pQueryInfo->interval.offset) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
} else {
|
||||
// TODO: offset should be shorter than interval, but how to check
|
||||
// conflicts like 30days offset and 1 month interval
|
||||
}
|
||||
*/
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -724,29 +742,29 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
|
|||
|
||||
SStrToken* pSliding = &pQuerySql->sliding;
|
||||
if (pSliding->n == 0) {
|
||||
pQueryInfo->slidingTimeUnit = pQueryInfo->intervalTimeUnit;
|
||||
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
|
||||
pQueryInfo->interval.slidingUnit = pQueryInfo->interval.intervalUnit;
|
||||
pQueryInfo->interval.sliding = pQueryInfo->interval.interval;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pQueryInfo->intervalTimeUnit == 'n' || pQueryInfo->intervalTimeUnit == 'y') {
|
||||
if (pQueryInfo->interval.intervalUnit == 'n' || pQueryInfo->interval.intervalUnit == 'y') {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->slidingTime);
|
||||
getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->interval.sliding);
|
||||
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->slidingTime /= 1000;
|
||||
pQueryInfo->interval.sliding /= 1000;
|
||||
}
|
||||
|
||||
if (pQueryInfo->slidingTime < tsMinSlidingTime) {
|
||||
if (pQueryInfo->interval.sliding < tsMinSlidingTime) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0);
|
||||
}
|
||||
|
||||
if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) {
|
||||
if (pQueryInfo->interval.sliding > pQueryInfo->interval.interval) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
if ((pQueryInfo->intervalTime != 0) && (pQueryInfo->intervalTime/pQueryInfo->slidingTime > INTERVAL_SLIDING_FACTOR)) {
|
||||
if ((pQueryInfo->interval.interval != 0) && (pQueryInfo->interval.interval/pQueryInfo->interval.sliding > INTERVAL_SLIDING_FACTOR)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
||||
|
@ -4758,9 +4776,9 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
|||
const char* msg0 = "sample interval can not be less than 10ms.";
|
||||
const char* msg1 = "functions not allowed in select clause";
|
||||
|
||||
if (pQueryInfo->intervalTime != 0 && pQueryInfo->intervalTime < 10 &&
|
||||
pQueryInfo->intervalTimeUnit != 'n' &&
|
||||
pQueryInfo->intervalTimeUnit != 'y') {
|
||||
if (pQueryInfo->interval.interval != 0 && pQueryInfo->interval.interval < 10 &&
|
||||
pQueryInfo->interval.intervalUnit != 'n' &&
|
||||
pQueryInfo->interval.intervalUnit != 'y') {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0);
|
||||
}
|
||||
|
||||
|
@ -5545,7 +5563,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
|
|||
insertResultField(pQueryInfo, (int32_t)size, &ids, bytes, (int8_t)type, name, pExpr);
|
||||
} else {
|
||||
// if this query is "group by" normal column, interval is not allowed
|
||||
if (pQueryInfo->intervalTime > 0) {
|
||||
if (pQueryInfo->interval.interval > 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
||||
|
@ -5578,7 +5596,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
|||
|
||||
// only retrieve tags, group by is not supportted
|
||||
if (tscQueryTags(pQueryInfo)) {
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || pQueryInfo->intervalTime > 0) {
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || pQueryInfo->interval.interval > 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
} else {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -6030,7 +6048,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
if (parseIntervalClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
} else {
|
||||
if ((pQueryInfo->intervalTime > 0) &&
|
||||
if ((pQueryInfo->interval.interval > 0) &&
|
||||
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
@ -6060,7 +6078,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
* not here.
|
||||
*/
|
||||
if (pQuerySql->fillType != NULL) {
|
||||
if (pQueryInfo->intervalTime == 0) {
|
||||
if (pQueryInfo->interval.interval == 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
|
@ -6228,7 +6246,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
|||
if (parseIntervalClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
} else {
|
||||
if ((pQueryInfo->intervalTime > 0) &&
|
||||
if ((pQueryInfo->interval.interval > 0) &&
|
||||
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
@ -6279,14 +6297,14 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
|||
* the columns may be increased due to group by operation
|
||||
*/
|
||||
if (pQuerySql->fillType != NULL) {
|
||||
if (pQueryInfo->intervalTime == 0 && (!tscIsPointInterpQuery(pQueryInfo))) {
|
||||
if (pQueryInfo->interval.interval == 0 && (!tscIsPointInterpQuery(pQueryInfo))) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
}
|
||||
|
||||
if (pQueryInfo->intervalTime > 0 && pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y') {
|
||||
if (pQueryInfo->interval.interval > 0 && pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') {
|
||||
int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey);
|
||||
// number of result is not greater than 10,000,000
|
||||
if ((timeRange == 0) || (timeRange / pQueryInfo->intervalTime) > MAX_INTERVAL_TIME_WINDOW) {
|
||||
if ((timeRange == 0) || (timeRange / pQueryInfo->interval.interval) > MAX_INTERVAL_TIME_WINDOW) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -647,8 +647,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
if (pQueryInfo->intervalTime < 0) {
|
||||
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
|
||||
if (pQueryInfo->interval.interval < 0) {
|
||||
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->interval.interval);
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
|
@ -675,10 +675,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
|
||||
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
|
||||
pQueryMsg->numOfCols = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
|
||||
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
|
||||
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
|
||||
pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
|
||||
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
|
||||
pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval);
|
||||
pQueryMsg->interval.sliding = htobe64(pQueryInfo->interval.sliding);
|
||||
pQueryMsg->interval.offset = htobe64(pQueryInfo->interval.offset);
|
||||
pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit;
|
||||
pQueryMsg->interval.slidingUnit = pQueryInfo->interval.slidingUnit;
|
||||
pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit;
|
||||
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
|
||||
pQueryMsg->numOfTags = htonl(numOfTags);
|
||||
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
|
||||
|
|
|
@ -400,43 +400,43 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
|
|||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
|
||||
if (pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y' && pQueryInfo->intervalTime < minIntervalTime) {
|
||||
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.interval < minIntervalTime) {
|
||||
tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%" PRId64, pSql, pStream,
|
||||
pQueryInfo->intervalTime, minIntervalTime);
|
||||
pQueryInfo->intervalTime = minIntervalTime;
|
||||
pQueryInfo->interval.interval, minIntervalTime);
|
||||
pQueryInfo->interval.interval = minIntervalTime;
|
||||
}
|
||||
|
||||
pStream->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
|
||||
pStream->intervalTime = pQueryInfo->intervalTime; // it shall be derived from sql string
|
||||
pStream->intervalTimeUnit = pQueryInfo->interval.intervalUnit;
|
||||
pStream->intervalTime = pQueryInfo->interval.interval; // it shall be derived from sql string
|
||||
|
||||
if (pQueryInfo->slidingTime <= 0) {
|
||||
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
|
||||
pQueryInfo->slidingTimeUnit = pQueryInfo->intervalTimeUnit;
|
||||
if (pQueryInfo->interval.sliding <= 0) {
|
||||
pQueryInfo->interval.sliding = pQueryInfo->interval.interval;
|
||||
pQueryInfo->interval.slidingUnit = pQueryInfo->interval.intervalUnit;
|
||||
}
|
||||
|
||||
int64_t minSlidingTime =
|
||||
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
|
||||
|
||||
if (pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y' && pQueryInfo->slidingTime < minSlidingTime) {
|
||||
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.sliding < minSlidingTime) {
|
||||
tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream,
|
||||
pQueryInfo->slidingTime, minSlidingTime);
|
||||
pQueryInfo->interval.sliding, minSlidingTime);
|
||||
|
||||
pQueryInfo->slidingTime = minSlidingTime;
|
||||
pQueryInfo->interval.sliding = minSlidingTime;
|
||||
}
|
||||
|
||||
if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) {
|
||||
if (pQueryInfo->interval.sliding > pQueryInfo->interval.interval) {
|
||||
tscWarn("%p stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64, pSql, pStream,
|
||||
pQueryInfo->slidingTime, pQueryInfo->intervalTime);
|
||||
pQueryInfo->interval.sliding, pQueryInfo->interval.interval);
|
||||
|
||||
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
|
||||
pQueryInfo->interval.sliding = pQueryInfo->interval.interval;
|
||||
}
|
||||
|
||||
pStream->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
|
||||
pStream->slidingTime = pQueryInfo->slidingTime;
|
||||
pStream->slidingTimeUnit = pQueryInfo->interval.slidingUnit;
|
||||
pStream->slidingTime = pQueryInfo->interval.sliding;
|
||||
|
||||
if (pStream->isProject) {
|
||||
pQueryInfo->intervalTime = 0; // clear the interval value to avoid the force time window split by query processor
|
||||
pQueryInfo->slidingTime = 0;
|
||||
pQueryInfo->interval.interval = 0; // clear the interval value to avoid the force time window split by query processor
|
||||
pQueryInfo->interval.sliding = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -113,7 +113,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
|
|||
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
|
||||
* final results which is acquired after the secondry merge of in the client.
|
||||
*/
|
||||
if (pLimit->offset == 0 || pQueryInfo->intervalTime > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
|
||||
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
|
||||
if (win->skey > elem1.ts) {
|
||||
win->skey = elem1.ts;
|
||||
}
|
||||
|
@ -178,6 +178,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
|
|||
pSupporter->subqueryIndex = index;
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
|
||||
|
||||
memcpy(&pSupporter->interval, &pQueryInfo->interval, sizeof(pSupporter->interval));
|
||||
pSupporter->limit = pQueryInfo->limit;
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
|
||||
|
@ -307,6 +308,9 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
|
|||
// set the second stage sub query for join process
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
|
||||
|
||||
memcpy(&pQueryInfo->interval, &pSupporter->interval, sizeof(pQueryInfo->interval));
|
||||
pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
|
||||
|
||||
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
|
||||
|
||||
pQueryInfo->colList = pSupporter->colList;
|
||||
|
@ -1204,7 +1208,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
|
|||
}
|
||||
|
||||
pNew->cmd.numOfCols = 0;
|
||||
pNewQueryInfo->intervalTime = 0;
|
||||
pNewQueryInfo->interval.interval = 0;
|
||||
pSupporter->limit = pNewQueryInfo->limit;
|
||||
|
||||
pNewQueryInfo->limit.limit = -1;
|
||||
|
@ -2185,7 +2189,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
|
|||
}
|
||||
|
||||
// primary key column cannot be null in interval query, no need to check
|
||||
if (i == 0 && pQueryInfo->intervalTime > 0) {
|
||||
if (i == 0 && pQueryInfo->interval.interval > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -1899,10 +1899,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
pNewQueryInfo->command = pQueryInfo->command;
|
||||
pNewQueryInfo->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
|
||||
pNewQueryInfo->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
|
||||
pNewQueryInfo->intervalTime = pQueryInfo->intervalTime;
|
||||
pNewQueryInfo->slidingTime = pQueryInfo->slidingTime;
|
||||
memcpy(&pNewQueryInfo->interval, &pQueryInfo->interval, sizeof(pNewQueryInfo->interval));
|
||||
pNewQueryInfo->type = pQueryInfo->type;
|
||||
pNewQueryInfo->window = pQueryInfo->window;
|
||||
pNewQueryInfo->limit = pQueryInfo->limit;
|
||||
|
|
|
@ -460,12 +460,7 @@ typedef struct {
|
|||
int16_t order;
|
||||
int16_t orderColId;
|
||||
int16_t numOfCols; // the number of columns will be load from vnode
|
||||
int64_t intervalTime; // time interval for aggregation, in million second
|
||||
int64_t slidingTime; // value for sliding window
|
||||
int64_t offsetTime; // start offset for interval query
|
||||
char intervalTimeUnit;
|
||||
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
|
||||
char offsetTimeUnit;
|
||||
SInterval interval;
|
||||
uint16_t tagCondLen; // tag length in current query
|
||||
int16_t numOfGroupCols; // num of group by columns
|
||||
int16_t orderByIdx;
|
||||
|
|
|
@ -63,6 +63,19 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
typedef struct SInterval {
|
||||
char intervalUnit;
|
||||
char slidingUnit;
|
||||
char offsetUnit;
|
||||
int64_t interval;
|
||||
int64_t sliding;
|
||||
int64_t offset;
|
||||
} SInterval;
|
||||
|
||||
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
|
||||
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
|
||||
|
||||
int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts);
|
||||
int32_t parseDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit);
|
||||
|
||||
|
|
|
@ -403,6 +403,102 @@ int32_t parseDuration(const char* token, int32_t tokenLen, int64_t* duration, ch
|
|||
return getTimestampInUsFromStrImpl(*duration, *unit, duration);
|
||||
}
|
||||
|
||||
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
|
||||
if (duration == 0) {
|
||||
return t;
|
||||
}
|
||||
if (unit == 'y') {
|
||||
duration *= 12;
|
||||
} else if (unit != 'n') {
|
||||
return t + duration;
|
||||
}
|
||||
|
||||
t /= 1000;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
t /= 1000;
|
||||
}
|
||||
|
||||
struct tm tm;
|
||||
time_t tt = (time_t)t;
|
||||
localtime_r(&tt, &tm);
|
||||
int mon = tm.tm_year * 12 + tm.tm_mon + (int)duration;
|
||||
tm.tm_year = mon / 12;
|
||||
tm.tm_mon = mon % 12;
|
||||
|
||||
t = mktime(&tm) * 1000L;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
t *= 1000L;
|
||||
}
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision) {
|
||||
if (pInterval->sliding == 0) {
|
||||
assert(pInterval->interval == 0);
|
||||
return t;
|
||||
}
|
||||
|
||||
int64_t start = t;
|
||||
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
|
||||
start /= 1000;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
start /= 1000;
|
||||
}
|
||||
struct tm tm;
|
||||
time_t t = (time_t)start;
|
||||
localtime_r(&t, &tm);
|
||||
tm.tm_sec = 0;
|
||||
tm.tm_min = 0;
|
||||
tm.tm_hour = 0;
|
||||
tm.tm_mday = 1;
|
||||
|
||||
if (pInterval->slidingUnit == 'y') {
|
||||
tm.tm_mon = 0;
|
||||
tm.tm_year = (int)(tm.tm_year / pInterval->sliding * pInterval->sliding);
|
||||
} else {
|
||||
int mon = tm.tm_year * 12 + tm.tm_mon;
|
||||
mon = (int)(mon / pInterval->sliding * pInterval->sliding);
|
||||
tm.tm_year = mon / 12;
|
||||
tm.tm_mon = mon % 12;
|
||||
}
|
||||
|
||||
start = mktime(&tm) * 1000L;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
start *= 1000L;
|
||||
}
|
||||
} else {
|
||||
int64_t delta = t - pInterval->interval;
|
||||
int32_t factor = delta > 0 ? 1 : -1;
|
||||
|
||||
start = (delta / pInterval->sliding + factor) * pInterval->sliding;
|
||||
|
||||
if (pInterval->intervalUnit == 'd' || pInterval->intervalUnit == 'w') {
|
||||
/*
|
||||
* here we revised the start time of day according to the local time zone,
|
||||
* but in case of DST, the start time of one day need to be dynamically decided.
|
||||
*/
|
||||
// todo refactor to extract function that is available for Linux/Windows/Mac platform
|
||||
#if defined(WINDOWS) && _MSC_VER >= 1900
|
||||
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
|
||||
int64_t timezone = _timezone;
|
||||
int32_t daylight = _daylight;
|
||||
char** tzname = _tzname;
|
||||
#endif
|
||||
|
||||
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
|
||||
start += timezone * t;
|
||||
}
|
||||
|
||||
int64_t end = start + pInterval->interval - 1;
|
||||
if (end < t) {
|
||||
start += pInterval->sliding;
|
||||
}
|
||||
}
|
||||
|
||||
return taosTimeAdd(start, pInterval->offset, pInterval->intervalUnit, precision);
|
||||
}
|
||||
|
||||
// internal function, when program is paused in debugger,
|
||||
// one can call this function from debugger to print a
|
||||
// timestamp as human readable string, for example (gdb):
|
||||
|
|
|
@ -132,12 +132,9 @@ typedef struct SQueryCostInfo {
|
|||
typedef struct SQuery {
|
||||
int16_t numOfCols;
|
||||
int16_t numOfTags;
|
||||
char intervalTimeUnit;
|
||||
char slidingTimeUnit; // interval data type, used for daytime revise
|
||||
SOrderVal order;
|
||||
STimeWindow window;
|
||||
int64_t intervalTime;
|
||||
int64_t slidingTime; // sliding time for sliding window query
|
||||
SInterval interval;
|
||||
int16_t precision;
|
||||
int16_t numOfOutput;
|
||||
int16_t fillType;
|
||||
|
|
|
@ -51,12 +51,11 @@ typedef struct SFillInfo {
|
|||
int32_t rowSize; // size of each row
|
||||
// char ** pTags; // tags value for current interpolation
|
||||
SFillTagColInfo* pTags; // tags value for filling gap
|
||||
int64_t slidingTime; // sliding value to determine the number of result for a given time window
|
||||
SInterval interval;
|
||||
char * prevValues; // previous row of data, to generate the interpolation results
|
||||
char * nextValues; // next row of data
|
||||
char** pData; // original result data block involved in filling data
|
||||
int32_t capacityInRows; // data buffer size in rows
|
||||
int8_t slidingUnit; // sliding time unit
|
||||
int8_t precision; // time resoluation
|
||||
SFillColInfo* pFillCol; // column info for fill operations
|
||||
} SFillInfo;
|
||||
|
|
|
@ -131,21 +131,21 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
|
|||
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
||||
static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
|
||||
|
||||
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
|
||||
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
|
||||
|
||||
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
|
||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
if (pQuery->intervalTimeUnit != 'n' && pQuery->intervalTimeUnit != 'y') {
|
||||
tw->skey += pQuery->slidingTime * factor;
|
||||
tw->ekey = tw->skey + pQuery->intervalTime - 1;
|
||||
if (pQuery->interval.intervalUnit != 'n' && pQuery->interval.intervalUnit != 'y') {
|
||||
tw->skey += pQuery->interval.sliding * factor;
|
||||
tw->ekey = tw->skey + pQuery->interval.interval - 1;
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t key = tw->skey / 1000, interval = pQuery->intervalTime;
|
||||
int64_t key = tw->skey / 1000, interval = pQuery->interval.interval;
|
||||
if (pQuery->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
key /= 1000;
|
||||
}
|
||||
if (pQuery->intervalTimeUnit == 'y') {
|
||||
if (pQuery->interval.intervalUnit == 'y') {
|
||||
interval *= 12;
|
||||
}
|
||||
|
||||
|
@ -510,10 +510,10 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
|
|||
|
||||
if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value
|
||||
w.skey = pWindowResInfo->prevSKey;
|
||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||
w.ekey = taosAddNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||
if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
|
||||
w.ekey = taosTimeAdd(w.skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision);
|
||||
} else {
|
||||
w.ekey = w.skey + pQuery->intervalTime - 1;
|
||||
w.ekey = w.skey + pQuery->interval.interval - 1;
|
||||
}
|
||||
} else {
|
||||
int32_t slot = curTimeWindowIndex(pWindowResInfo);
|
||||
|
@ -522,23 +522,23 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
|
|||
}
|
||||
|
||||
if (w.skey > ts || w.ekey < ts) {
|
||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||
w.skey = taosGetIntervalStartTimestamp(ts, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
||||
w.ekey = taosAddNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||
if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
|
||||
w.skey = taosTimeTruncate(ts, &pQuery->interval, pQuery->precision);
|
||||
w.ekey = taosTimeAdd(w.skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1;
|
||||
} else {
|
||||
int64_t st = w.skey;
|
||||
|
||||
if (st > ts) {
|
||||
st -= ((st - ts + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
|
||||
st -= ((st - ts + pQuery->interval.sliding - 1) / pQuery->interval.sliding) * pQuery->interval.sliding;
|
||||
}
|
||||
|
||||
int64_t et = st + pQuery->intervalTime - 1;
|
||||
int64_t et = st + pQuery->interval.interval - 1;
|
||||
if (et < ts) {
|
||||
st += ((ts - et + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
|
||||
st += ((ts - et + pQuery->interval.sliding - 1) / pQuery->interval.sliding) * pQuery->interval.sliding;
|
||||
}
|
||||
|
||||
w.skey = st;
|
||||
w.ekey = w.skey + pQuery->intervalTime - 1;
|
||||
w.ekey = w.skey + pQuery->interval.interval - 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -848,7 +848,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
|
|||
|
||||
int32_t startPos = 0;
|
||||
// tumbling time window query, a special case of sliding time window query
|
||||
if (pQuery->slidingTime == pQuery->intervalTime && prevPosition != -1) {
|
||||
if (pQuery->interval.sliding == pQuery->interval.interval && prevPosition != -1) {
|
||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
startPos = prevPosition + factor;
|
||||
} else {
|
||||
|
@ -861,21 +861,21 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
|
|||
*/
|
||||
if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNext->ekey) {
|
||||
TSKEY next = primaryKeys[startPos];
|
||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||
pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
||||
pNext->ekey = taosAddNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||
if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
|
||||
pNext->skey = taosTimeTruncate(next, &pQuery->interval, pQuery->precision);
|
||||
pNext->ekey = taosTimeAdd(pNext->skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1;
|
||||
} else {
|
||||
pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
|
||||
pNext->skey = pNext->ekey - pQuery->intervalTime + 1;
|
||||
pNext->ekey += ((next - pNext->ekey + pQuery->interval.sliding - 1)/pQuery->interval.sliding) * pQuery->interval.sliding;
|
||||
pNext->skey = pNext->ekey - pQuery->interval.interval + 1;
|
||||
}
|
||||
} else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNext->skey) {
|
||||
TSKEY next = primaryKeys[startPos];
|
||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||
pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
||||
pNext->ekey = taosAddNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||
if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
|
||||
pNext->skey = taosTimeTruncate(next, &pQuery->interval, pQuery->precision);
|
||||
pNext->ekey = taosTimeAdd(pNext->skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1;
|
||||
} else {
|
||||
pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
|
||||
pNext->ekey = pNext->skey + pQuery->intervalTime - 1;
|
||||
pNext->skey -= ((pNext->skey - next + pQuery->interval.sliding - 1) / pQuery->interval.sliding) * pQuery->interval.sliding;
|
||||
pNext->ekey = pNext->skey + pQuery->interval.interval - 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1871,20 +1871,20 @@ static bool onlyQueryTags(SQuery* pQuery) {
|
|||
/////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win) {
|
||||
assert(key >= keyFirst && key <= keyLast && pQuery->slidingTime <= pQuery->intervalTime);
|
||||
win->skey = taosGetIntervalStartTimestamp(key, pQuery->slidingTime, pQuery->intervalTime, pQuery->slidingTimeUnit, pQuery->precision);
|
||||
assert(key >= keyFirst && key <= keyLast && pQuery->interval.sliding <= pQuery->interval.interval);
|
||||
win->skey = taosTimeTruncate(key, &pQuery->interval, pQuery->precision);
|
||||
|
||||
/*
|
||||
* if the realSkey > INT64_MAX - pQuery->intervalTime, the query duration between
|
||||
* if the realSkey > INT64_MAX - pQuery->interval.interval, the query duration between
|
||||
* realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
|
||||
*/
|
||||
if (keyFirst > (INT64_MAX - pQuery->intervalTime)) {
|
||||
assert(keyLast - keyFirst < pQuery->intervalTime);
|
||||
if (keyFirst > (INT64_MAX - pQuery->interval.interval)) {
|
||||
assert(keyLast - keyFirst < pQuery->interval.interval);
|
||||
win->ekey = INT64_MAX;
|
||||
} else if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||
win->ekey = taosAddNatualInterval(win->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||
} else if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
|
||||
win->ekey = taosTimeAdd(win->skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1;
|
||||
} else {
|
||||
win->ekey = win->skey + pQuery->intervalTime - 1;
|
||||
win->ekey = win->skey + pQuery->interval.interval - 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1998,7 +1998,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
|
|||
return;
|
||||
}
|
||||
|
||||
if (isPointInterpoQuery(pQuery) && pQuery->intervalTime == 0) {
|
||||
if (isPointInterpoQuery(pQuery) && pQuery->interval.interval == 0) {
|
||||
if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
qDebug(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey,
|
||||
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
|
||||
|
@ -2009,7 +2009,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
|
|||
return;
|
||||
}
|
||||
|
||||
if (pQuery->intervalTime == 0) {
|
||||
if (pQuery->interval.interval == 0) {
|
||||
if (onlyFirstQuery(pQuery)) {
|
||||
if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
qDebug(msg, GET_QINFO_ADDR(pQuery), "only-first", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey,
|
||||
|
@ -4269,8 +4269,8 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
|||
}
|
||||
|
||||
/*
|
||||
* 1. for interval without interpolation query we forward pQuery->intervalTime at a time for
|
||||
* pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is
|
||||
* 1. for interval without interpolation query we forward pQuery->interval.interval at a time for
|
||||
* pQuery->limit.offset times. Since hole exists, pQuery->interval.interval*pQuery->limit.offset value is
|
||||
* not valid. otherwise, we only forward pQuery->limit.offset number of points
|
||||
*/
|
||||
assert(pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL);
|
||||
|
@ -4569,7 +4569,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
getAlignQueryTimeWindow(pQuery, pQuery->window.skey, sk, ek, &w);
|
||||
|
||||
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pQuery->rec.capacity, pQuery->numOfOutput,
|
||||
pQuery->slidingTime, pQuery->slidingTimeUnit, (int8_t)pQuery->precision,
|
||||
pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision,
|
||||
pQuery->fillType, pColInfo);
|
||||
}
|
||||
|
||||
|
@ -5430,7 +5430,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
|
|||
(isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pRuntimeEnv->groupbyNormalCol))) {
|
||||
multiTableQueryProcess(pQInfo);
|
||||
} else {
|
||||
assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) ||
|
||||
assert((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) ||
|
||||
isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol);
|
||||
|
||||
sequentialTableProcess(pQInfo);
|
||||
|
@ -5476,8 +5476,8 @@ bool validateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SC
|
|||
}
|
||||
|
||||
static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) {
|
||||
if (pQueryMsg->intervalTime < 0) {
|
||||
qError("qmsg:%p illegal value of interval time %" PRId64, pQueryMsg, pQueryMsg->intervalTime);
|
||||
if (pQueryMsg->interval.interval < 0) {
|
||||
qError("qmsg:%p illegal value of interval time %" PRId64, pQueryMsg, pQueryMsg->interval.interval);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -5556,8 +5556,12 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
|
||||
pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey);
|
||||
pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey);
|
||||
pQueryMsg->intervalTime = htobe64(pQueryMsg->intervalTime);
|
||||
pQueryMsg->slidingTime = htobe64(pQueryMsg->slidingTime);
|
||||
pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval);
|
||||
pQueryMsg->interval.sliding = htobe64(pQueryMsg->interval.sliding);
|
||||
pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset);
|
||||
pQueryMsg->interval.intervalUnit = pQueryMsg->interval.intervalUnit;
|
||||
pQueryMsg->interval.slidingUnit = pQueryMsg->interval.slidingUnit;
|
||||
pQueryMsg->interval.offsetUnit = pQueryMsg->interval.offsetUnit;
|
||||
pQueryMsg->limit = htobe64(pQueryMsg->limit);
|
||||
pQueryMsg->offset = htobe64(pQueryMsg->offset);
|
||||
|
||||
|
@ -5770,7 +5774,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, "
|
||||
"outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64,
|
||||
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
|
||||
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
|
||||
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->interval.interval,
|
||||
pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -6109,10 +6113,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
pQuery->order.orderColId = pQueryMsg->orderColId;
|
||||
pQuery->pSelectExpr = pExprs;
|
||||
pQuery->pGroupbyExpr = pGroupbyExpr;
|
||||
pQuery->intervalTime = pQueryMsg->intervalTime;
|
||||
pQuery->slidingTime = pQueryMsg->slidingTime;
|
||||
pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit;
|
||||
pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit;
|
||||
memcpy(&pQuery->interval, &pQueryMsg->interval, sizeof(pQuery->interval));
|
||||
pQuery->fillType = pQueryMsg->fillType;
|
||||
pQuery->numOfTags = pQueryMsg->numOfTags;
|
||||
pQuery->tagColList = pTagCols;
|
||||
|
|
|
@ -38,8 +38,8 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
|
|||
pFillInfo->numOfTags = numOfTags;
|
||||
pFillInfo->numOfCols = numOfCols;
|
||||
pFillInfo->precision = precision;
|
||||
pFillInfo->slidingTime = slidingTime;
|
||||
pFillInfo->slidingUnit = slidingUnit;
|
||||
pFillInfo->interval.sliding = slidingTime;
|
||||
pFillInfo->interval.slidingUnit = slidingUnit;
|
||||
|
||||
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
|
||||
if (numOfTags > 0) {
|
||||
|
@ -121,7 +121,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
|
|||
return;
|
||||
}
|
||||
|
||||
pFillInfo->endKey = taosGetRevisedEndKey(endKey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit,
|
||||
pFillInfo->endKey = taosGetRevisedEndKey(endKey, pFillInfo->order, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit,
|
||||
pFillInfo->precision);
|
||||
|
||||
pFillInfo->rowIdx = 0;
|
||||
|
@ -172,17 +172,17 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows
|
|||
|
||||
int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
|
||||
|
||||
TSKEY ekey1 = taosGetRevisedEndKey(ekey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit,
|
||||
TSKEY ekey1 = taosGetRevisedEndKey(ekey, pFillInfo->order, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit,
|
||||
pFillInfo->precision);
|
||||
|
||||
int64_t numOfRes = -1;
|
||||
if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set.
|
||||
TSKEY lastKey = tsList[pFillInfo->numOfRows - 1];
|
||||
|
||||
if (pFillInfo->slidingUnit != 'y' && pFillInfo->slidingUnit != 'n') {
|
||||
numOfRes = (int64_t)(ABS(lastKey - pFillInfo->start) / pFillInfo->slidingTime) + 1;
|
||||
if (pFillInfo->interval.slidingUnit != 'y' && pFillInfo->interval.slidingUnit != 'n') {
|
||||
numOfRes = (int64_t)(ABS(lastKey - pFillInfo->start) / pFillInfo->interval.sliding) + 1;
|
||||
} else {
|
||||
numOfRes = taosCountNatualInterval(lastKey, pFillInfo->start, pFillInfo->slidingTime, pFillInfo->slidingUnit, pFillInfo->precision) + 1;
|
||||
numOfRes = taosCountNatualInterval(lastKey, pFillInfo->start, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->precision) + 1;
|
||||
}
|
||||
assert(numOfRes >= numOfRows);
|
||||
} else { // reach the end of data
|
||||
|
@ -191,10 +191,10 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows
|
|||
return 0;
|
||||
}
|
||||
// the numOfRes rows are all filled with specified policy
|
||||
if (pFillInfo->slidingUnit != 'y' && pFillInfo->slidingUnit != 'n') {
|
||||
numOfRes = (ABS(ekey1 - pFillInfo->start) / pFillInfo->slidingTime) + 1;
|
||||
if (pFillInfo->interval.slidingUnit != 'y' && pFillInfo->interval.slidingUnit != 'n') {
|
||||
numOfRes = (ABS(ekey1 - pFillInfo->start) / pFillInfo->interval.sliding) + 1;
|
||||
} else {
|
||||
numOfRes = taosCountNatualInterval(ekey1, pFillInfo->start, pFillInfo->slidingTime, pFillInfo->slidingUnit, pFillInfo->precision) + 1;
|
||||
numOfRes = taosCountNatualInterval(ekey1, pFillInfo->start, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->precision) + 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -375,10 +375,10 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu
|
|||
}
|
||||
|
||||
// TODO natual sliding time
|
||||
if (pFillInfo->slidingUnit != 'n' && pFillInfo->slidingUnit != 'y') {
|
||||
pFillInfo->start += (pFillInfo->slidingTime * step);
|
||||
if (pFillInfo->interval.slidingUnit != 'n' && pFillInfo->interval.slidingUnit != 'y') {
|
||||
pFillInfo->start += (pFillInfo->interval.sliding * step);
|
||||
} else {
|
||||
pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->slidingTime*step, pFillInfo->slidingUnit, pFillInfo->precision);
|
||||
pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->interval.sliding*step, pFillInfo->interval.slidingUnit, pFillInfo->precision);
|
||||
}
|
||||
pFillInfo->numOfCurrent++;
|
||||
|
||||
|
@ -487,10 +487,10 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
|
|||
setTagsValue(pFillInfo, data, num);
|
||||
|
||||
// TODO natual sliding time
|
||||
if (pFillInfo->slidingUnit != 'n' && pFillInfo->slidingUnit != 'y') {
|
||||
pFillInfo->start += (pFillInfo->slidingTime * step);
|
||||
if (pFillInfo->interval.slidingUnit != 'n' && pFillInfo->interval.slidingUnit != 'y') {
|
||||
pFillInfo->start += (pFillInfo->interval.sliding * step);
|
||||
} else {
|
||||
pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->slidingTime*step, pFillInfo->slidingUnit, pFillInfo->precision);
|
||||
pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->interval.sliding*step, pFillInfo->interval.slidingUnit, pFillInfo->precision);
|
||||
}
|
||||
pFillInfo->rowIdx += 1;
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
|
|||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pWindowResInfo->interval = pRuntimeEnv->pQuery->intervalTime;
|
||||
pWindowResInfo->interval = pRuntimeEnv->pQuery->interval.interval;
|
||||
|
||||
pSummary->internalSupSize += sizeof(SWindowResult) * threshold;
|
||||
pSummary->internalSupSize += (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * pWindowResInfo->capacity;
|
||||
|
|
Loading…
Reference in New Issue