From fb19257a9970f3a2671180edfa6f9a5c6f6d0048 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Thu, 27 Aug 2020 17:01:26 +0800 Subject: [PATCH 1/4] td-1099: parse natual month/year --- src/client/inc/tsclient.h | 3 +- src/client/src/tscSQLParser.c | 60 +++++++++++++++++++---------------- src/os/inc/osTime.h | 1 + src/os/src/detail/osTime.c | 19 +++++++++++ 4 files changed, 55 insertions(+), 28 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d2c52e972a..d0141835a3 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -229,8 +229,9 @@ typedef struct STableDataBlocks { typedef struct SQueryInfo { int16_t command; // the command may be different for each subclause, so keep it seperately. - uint32_t type; // query/insert type + char intervalTimeUnit; char slidingTimeUnit; + uint32_t type; // query/insert type STimeWindow window; // query time window int64_t intervalTime; // aggregation time interval int64_t slidingTime; // sliding window in mseconds diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 5ce4c7125f..1fb8594588 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -586,22 +586,21 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ } // interval is not null - SStrToken* t = &pQuerySql->interval; - if (getTimestampInUsFromStr(t->z, t->n, &pQueryInfo->intervalTime) != TSDB_CODE_SUCCESS) { + SSQLToken* t = &pQuerySql->interval; + if (parseDuration(t->z, t->n, &pQueryInfo->intervalTime, &pQueryInfo->intervalTimeUnit) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } - // if the unit of time window value is millisecond, change the value from microsecond - if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { - pQueryInfo->intervalTime = pQueryInfo->intervalTime / 1000; - } + if (pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y') { + // if the unit of time window value is millisecond, change the value from microsecond + if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { + pQueryInfo->intervalTime = pQueryInfo->intervalTime / 1000; + } - /* parser has filter the illegal type, no need to check here */ - pQueryInfo->slidingTimeUnit = pQuerySql->interval.z[pQuerySql->interval.n - 1]; - - // interval cannot be less than 10 milliseconds - if (pQueryInfo->intervalTime < tsMinIntervalTime) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + // interval cannot be less than 10 milliseconds + if (pQueryInfo->intervalTime < tsMinIntervalTime) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + } } // for top/bottom + interval query, we do not add additional timestamp column in the front @@ -666,28 +665,35 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu const char* msg0 = "sliding value too small"; const char* msg1 = "sliding value no larger than the interval value"; const char* msg2 = "sliding value can not less than 1% of interval value"; + const char* msg3 = "does not support sliding when interval is natual month/year"; const static int32_t INTERVAL_SLIDING_FACTOR = 100; STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - SStrToken* pSliding = &pQuerySql->sliding; - if (pSliding->n != 0) { - getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->slidingTime); - if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { - pQueryInfo->slidingTime /= 1000; - } - - if (pQueryInfo->slidingTime < tsMinSlidingTime) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0); - } - - if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); - } - } else { + SSQLToken* pSliding = &pQuerySql->sliding; + if (pSliding->n == 0) { + pQueryInfo->slidingTimeUnit = pQueryInfo->intervalTimeUnit; pQueryInfo->slidingTime = pQueryInfo->intervalTime; + return TSDB_CODE_SUCCESS; + } + + if (pQueryInfo->intervalTimeUnit == 'n' || pQueryInfo->intervalTimeUnit == 'y') { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); + } + + getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->slidingTime); + if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { + pQueryInfo->slidingTime /= 1000; + } + + if (pQueryInfo->slidingTime < tsMinSlidingTime) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0); + } + + if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } if ((pQueryInfo->intervalTime != 0) && (pQueryInfo->intervalTime/pQueryInfo->slidingTime > INTERVAL_SLIDING_FACTOR)) { diff --git a/src/os/inc/osTime.h b/src/os/inc/osTime.h index cd2553f753..97432ca241 100644 --- a/src/os/inc/osTime.h +++ b/src/os/inc/osTime.h @@ -64,6 +64,7 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) { } int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts); +int32_t parseDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit); int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); void deltaToUtcInitOnce(); diff --git a/src/os/src/detail/osTime.c b/src/os/src/detail/osTime.c index 57634e468a..9d8328a71b 100644 --- a/src/os/src/detail/osTime.c +++ b/src/os/src/detail/osTime.c @@ -319,6 +319,8 @@ int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec) { *time = factor * seconds + fraction; return 0; } + + static int32_t getTimestampInUsFromStrImpl(int64_t val, char unit, int64_t* result) { *result = val; @@ -384,6 +386,23 @@ int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts) { return getTimestampInUsFromStrImpl(timestamp, token[tokenlen - 1], ts); } +int32_t parseDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit) { + errno = 0; + + /* get the basic numeric value */ + *duration = strtoll(token, NULL, 10); + if (errno != 0) { + return -1; + } + + *unit = token[tokenLen - 1]; + if (*unit == 'n' || *unit == 'y') { + return 0; + } + + return getTimestampInUsFromStrImpl(*duration, *unit, duration); +} + // internal function, when program is paused in debugger, // one can call this function from debugger to print a // timestamp as human readable string, for example (gdb): From b3840db93ed889e12c961432af3f77399f1d8a6e Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Sat, 29 Aug 2020 13:57:28 +0800 Subject: [PATCH 2/4] td-1099: natual month/year processing --- src/client/inc/tscUtil.h | 2 + src/client/src/tscSQLParser.c | 6 +- src/client/src/tscServer.c | 1 + src/client/src/tscSubquery.c | 4 + src/client/src/tscUtil.c | 1 + src/common/src/tname.c | 69 ++++++++++++----- src/inc/taosmsg.h | 1 + src/query/inc/qExecutor.h | 3 +- src/query/src/qExecutor.c | 135 +++++++++++++++++++++++++++------- 9 files changed, 173 insertions(+), 49 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index f77897a74b..46a576fa9a 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -70,6 +70,8 @@ typedef struct SJoinSupporter { SSubqueryState* pState; SSqlObj* pObj; // parent SqlObj int32_t subqueryIndex; // index of sub query + char intervalTimeUnit; + char slidingTimeUnit; int64_t intervalTime; // interval time int64_t slidingTime; // sliding time SLimitVal limit; // limit info diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 1fb8594588..3e99b644bd 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4681,7 +4681,9 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { const char* msg0 = "sample interval can not be less than 10ms."; const char* msg1 = "functions not allowed in select clause"; - if (pQueryInfo->intervalTime != 0 && pQueryInfo->intervalTime < 10) { + if (pQueryInfo->intervalTime != 0 && pQueryInfo->intervalTime < 10 && + pQueryInfo->intervalTimeUnit != 'n' && + pQueryInfo->intervalTimeUnit != 'y') { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0); } @@ -6167,7 +6169,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } - if (pQueryInfo->intervalTime > 0) { + if (pQueryInfo->intervalTime > 0 && pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y') { int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey); // number of result is not greater than 10,000,000 if ((timeRange == 0) || (timeRange / pQueryInfo->intervalTime) > MAX_INTERVAL_TIME_WINDOW) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ecb85472fc..b36767dbb4 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -673,6 +673,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->numOfCols = htons((int16_t)taosArrayGetSize(pQueryInfo->colList)); pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); + pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit; pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit; pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->numOfTags = htonl(numOfTags); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 2fb264c756..7a626bfe5c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -178,6 +178,8 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in pSupporter->subqueryIndex = index; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); + pSupporter->intervalTimeUnit = pQueryInfo->intervalTimeUnit; + pSupporter->slidingTime = pQueryInfo->slidingTimeUnit; pSupporter->intervalTime = pQueryInfo->intervalTime; pSupporter->slidingTime = pQueryInfo->slidingTime; pSupporter->limit = pQueryInfo->limit; @@ -309,6 +311,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // set the second stage sub query for join process TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE); + pQueryInfo->intervalTimeUnit = pSupporter->intervalTimeUnit; + pQueryInfo->slidingTimeUnit = pSupporter->slidingTimeUnit; pQueryInfo->intervalTime = pSupporter->intervalTime; pQueryInfo->slidingTime = pSupporter->slidingTime; pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b61fd7e8c9..49f7c91397 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1830,6 +1830,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pNewQueryInfo->command = pQueryInfo->command; + pNewQueryInfo->intervalTimeUnit = pQueryInfo->intervalTimeUnit; pNewQueryInfo->slidingTimeUnit = pQueryInfo->slidingTimeUnit; pNewQueryInfo->intervalTime = pQueryInfo->intervalTime; pNewQueryInfo->slidingTime = pQueryInfo->slidingTime; diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 01945dbb00..0c89d26bed 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -104,29 +104,60 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in if (slidingTime == 0) { return startTime; } + int64_t start = startTime; + if (timeUnit == 'n' || timeUnit == 'y') { + start /= 1000; + if (precision == TSDB_TIME_PRECISION_MICRO) { + start /= 1000; + } + struct tm tm; + time_t t = (time_t)start; + localtime_r(&t, &tm); + tm.tm_sec = 0; + tm.tm_min = 0; + tm.tm_hour = 0; + tm.tm_mday = 1; - int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime; - if (!(timeUnit == 'u' || timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) { - /* - * here we revised the start time of day according to the local time zone, - * but in case of DST, the start time of one day need to be dynamically decided. - */ - // todo refactor to extract function that is available for Linux/Windows/Mac platform -#if defined(WINDOWS) && _MSC_VER >= 1900 - // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 - int64_t timezone = _timezone; - int32_t daylight = _daylight; - char** tzname = _tzname; -#endif + if (timeUnit == 'y') { + tm.tm_mon = 0; + tm.tm_year = tm.tm_year / slidingTime * slidingTime; + } else { + int mon = tm.tm_year * 12 + tm.tm_mon; + mon = mon / slidingTime * slidingTime; + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + } - int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L; - start += timezone * t; + start = mktime(&tm) * 1000L; + if (precision == TSDB_TIME_PRECISION_MICRO) { + start *= 1000L; + } + } else { + start = ((start - intervalTime) / slidingTime + 1) * slidingTime; + + if (timeUnit == 'd' || timeUnit == 'w') { + /* + * here we revised the start time of day according to the local time zone, + * but in case of DST, the start time of one day need to be dynamically decided. + */ + // todo refactor to extract function that is available for Linux/Windows/Mac platform + #if defined(WINDOWS) && _MSC_VER >= 1900 + // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 + int64_t timezone = _timezone; + int32_t daylight = _daylight; + char** tzname = _tzname; + #endif + + int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L; + start += timezone * t; + } + + int64_t end = start + intervalTime - 1; + if (end < startTime) { + start += slidingTime; + } } - int64_t end = start + intervalTime - 1; - if (end < startTime) { - start += slidingTime; - } return start; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 761a267ce5..0fe63a740e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -456,6 +456,7 @@ typedef struct { int64_t intervalTime; // time interval for aggregation, in million second int64_t intervalOffset; // start offset for interval query int64_t slidingTime; // value for sliding window + char intervalTimeUnit; char slidingTimeUnit; // time interval type, for revisement of interval(1d) uint16_t tagCondLen; // tag length in current query int16_t numOfGroupCols; // num of group by columns diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 7093495763..25fb04fb9a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -132,11 +132,12 @@ typedef struct SQueryCostInfo { typedef struct SQuery { int16_t numOfCols; int16_t numOfTags; + char intervalTimeUnit; + char slidingTimeUnit; // interval data type, used for daytime revise SOrderVal order; STimeWindow window; int64_t intervalTime; int64_t slidingTime; // sliding time for sliding window query - char slidingTimeUnit; // interval data type, used for daytime revise int16_t precision; int16_t numOfOutput; int16_t fillType; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 4e2e31d269..6c4046d776 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -137,13 +137,75 @@ static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv); #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0) -// previous time window may not be of the same size of pQuery->intervalTime -#define GET_NEXT_TIMEWINDOW(_q, tw) \ - do { \ - int32_t factor = GET_FORWARD_DIRECTION_FACTOR((_q)->order.order); \ - (tw)->skey += ((_q)->slidingTime * factor); \ - (tw)->ekey = (tw)->skey + ((_q)->intervalTime - 1); \ - } while (0) +static int64_t addNatualInterval(int64_t key, int64_t intervalTime, char intervalTimeUnit, int precision) { + key /= 1000; + if (precision == TSDB_TIME_PRECISION_MICRO) { + key /= 1000; + } + + struct tm tm; + time_t t = (time_t)key; + localtime_r(&t, &tm); + + if (intervalTimeUnit == 'y') { + intervalTime *= 12; + } + + int mon = tm.tm_year * 12 + tm.tm_mon + intervalTime; + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + + key = mktime(&tm) * 1000L; + + if (precision == TSDB_TIME_PRECISION_MICRO) { + key *= 1000L; + } + + return key; +} + +static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { + int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + if (pQuery->intervalTimeUnit != 'n' && pQuery->intervalTimeUnit != 'y') { + tw->skey += pQuery->slidingTime * factor; + tw->ekey = tw->skey + pQuery->intervalTime - 1; + return; + } + + int64_t key = tw->skey; + key /= 1000; + if (pQuery->precision == TSDB_TIME_PRECISION_MICRO) { + key /= 1000; + } + + struct tm tm; + time_t t = (time_t)key; + localtime_r(&t, &tm); + + if (pQuery->intervalTimeUnit == 'y') { + factor *= 12; + } + + int mon = tm.tm_year * 12 + tm.tm_mon; + mon += pQuery->intervalTime * factor; + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + + tw->skey = mktime(&tm) * 1000L; + + mon += pQuery->intervalTime * factor; + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + tw->ekey = mktime(&tm) * 1000L; + + if (pQuery->precision == TSDB_TIME_PRECISION_MICRO) { + tw->skey *= 1000L; + tw->ekey *= 1000L; + } + tw->ekey -= 1; +} + +#define GET_NEXT_TIMEWINDOW(_q, tw) getNextTimeWindow((_q), (tw)) #define SET_STABLE_QUERY_OVER(_q) ((_q)->tableIndex = (int32_t)((_q)->tableqinfoGroupInfo.numOfTables)) #define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables)) @@ -467,9 +529,13 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) { STimeWindow w = {0}; - if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value + if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value w.skey = pWindowResInfo->prevSKey; - w.ekey = w.skey + pQuery->intervalTime - 1; + if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { + w.ekey = addNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + } else { + w.ekey = w.skey + pQuery->intervalTime - 1; + } } else { int32_t slot = curTimeWindowIndex(pWindowResInfo); SWindowResult* pWindowRes = getWindowResult(pWindowResInfo, slot); @@ -477,19 +543,24 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t } if (w.skey > ts || w.ekey < ts) { - int64_t st = w.skey; + if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { + w.skey = taosGetIntervalStartTimestamp(ts, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); + w.ekey = addNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + } else { + int64_t st = w.skey; - if (st > ts) { - st -= ((st - ts + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; + if (st > ts) { + st -= ((st - ts + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; + } + + int64_t et = st + pQuery->intervalTime - 1; + if (et < ts) { + st += ((ts - et + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; + } + + w.skey = st; + w.ekey = w.skey + pQuery->intervalTime - 1; } - - int64_t et = st + pQuery->intervalTime - 1; - if (et < ts) { - st += ((ts - et + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; - } - - w.skey = st; - w.ekey = w.skey + pQuery->intervalTime - 1; } /* @@ -814,14 +885,22 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow */ if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNext->ekey) { TSKEY next = primaryKeys[startPos]; - - pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime; - pNext->skey = pNext->ekey - pQuery->intervalTime + 1; + if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { + pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); + pNext->ekey = addNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + } else { + pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime; + pNext->skey = pNext->ekey - pQuery->intervalTime + 1; + } } else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNext->skey) { TSKEY next = primaryKeys[startPos]; - - pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; - pNext->ekey = pNext->skey + pQuery->intervalTime - 1; + if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { + pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); + pNext->ekey = addNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + } else { + pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; + pNext->ekey = pNext->skey + pQuery->intervalTime - 1; + } } return startPos; @@ -1804,7 +1883,8 @@ void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int6 if (keyFirst > (INT64_MAX - pQuery->intervalTime)) { assert(keyLast - keyFirst < pQuery->intervalTime); win->ekey = INT64_MAX; - return; + } else if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { + win->ekey = addNatualInterval(win->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; } else { win->ekey = win->skey + pQuery->intervalTime - 1; } @@ -6016,6 +6096,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQuery->pGroupbyExpr = pGroupbyExpr; pQuery->intervalTime = pQueryMsg->intervalTime; pQuery->slidingTime = pQueryMsg->slidingTime; + pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit; pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit; pQuery->fillType = pQueryMsg->fillType; pQuery->numOfTags = pQueryMsg->numOfTags; From 1afed4c067e30f25e9d623785390f03a72217f43 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Sat, 29 Aug 2020 17:18:14 +0800 Subject: [PATCH 3/4] td-1099: fix bug & compile error / add test cases --- src/client/src/tscSQLParser.c | 4 +- src/query/src/qExecutor.c | 16 +-- tests/pytest/query/natualInterval.py | 170 +++++++++++++++++++++++++++ 3 files changed, 178 insertions(+), 12 deletions(-) create mode 100644 tests/pytest/query/natualInterval.py diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 3e99b644bd..9c677dc555 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -586,7 +586,7 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ } // interval is not null - SSQLToken* t = &pQuerySql->interval; + SStrToken* t = &pQuerySql->interval; if (parseDuration(t->z, t->n, &pQueryInfo->intervalTime, &pQueryInfo->intervalTimeUnit) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -672,7 +672,7 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - SSQLToken* pSliding = &pQuerySql->sliding; + SStrToken* pSliding = &pQuerySql->sliding; if (pSliding->n == 0) { pQueryInfo->slidingTimeUnit = pQueryInfo->intervalTimeUnit; pQueryInfo->slidingTime = pQueryInfo->intervalTime; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6c4046d776..79a4243537 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -172,28 +172,24 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { return; } - int64_t key = tw->skey; - key /= 1000; + int64_t key = tw->skey / 1000, interval = pQuery->intervalTime; if (pQuery->precision == TSDB_TIME_PRECISION_MICRO) { key /= 1000; } + if (pQuery->intervalTimeUnit == 'y') { + interval *= 12; + } struct tm tm; time_t t = (time_t)key; localtime_r(&t, &tm); - if (pQuery->intervalTimeUnit == 'y') { - factor *= 12; - } - - int mon = tm.tm_year * 12 + tm.tm_mon; - mon += pQuery->intervalTime * factor; + int mon = tm.tm_year * 12 + tm.tm_mon + interval * factor; tm.tm_year = mon / 12; tm.tm_mon = mon % 12; - tw->skey = mktime(&tm) * 1000L; - mon += pQuery->intervalTime * factor; + mon += interval; tm.tm_year = mon / 12; tm.tm_mon = mon % 12; tw->ekey = mktime(&tm) * 1000L; diff --git a/tests/pytest/query/natualInterval.py b/tests/pytest/query/natualInterval.py new file mode 100644 index 0000000000..1ed91e1c68 --- /dev/null +++ b/tests/pytest/query/natualInterval.py @@ -0,0 +1,170 @@ +################################################################### +# Copyright (c) 2020 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import * +from util.cases import * +from util.sql import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + def singleTable(self): + tdSql.execute("create table car(ts timestamp, s int)") + tdSql.execute("insert into car values('2019-01-01 00:00:00', 1)") + tdSql.execute("insert into car values('2019-05-13 12:00:00', 1)") + tdSql.execute("insert into car values('2019-12-31 23:59:59', 1)") + tdSql.execute("insert into car values('2020-01-01 12:00:00', 1)") + tdSql.execute("insert into car values('2020-01-02 12:00:00', 1)") + tdSql.execute("insert into car values('2020-01-03 12:00:00', 1)") + tdSql.execute("insert into car values('2020-01-04 12:00:00', 1)") + tdSql.execute("insert into car values('2020-01-05 12:00:00', 1)") + tdSql.execute("insert into car values('2020-01-31 12:00:00', 1)") + tdSql.execute("insert into car values('2020-02-01 12:00:00', 1)") + tdSql.execute("insert into car values('2020-02-02 12:00:00', 1)") + tdSql.execute("insert into car values('2020-02-29 12:00:00', 1)") + tdSql.execute("insert into car values('2020-03-01 12:00:00', 1)") + tdSql.execute("insert into car values('2020-03-02 12:00:00', 1)") + tdSql.execute("insert into car values('2020-03-15 12:00:00', 1)") + tdSql.execute("insert into car values('2020-03-31 12:00:00', 1)") + tdSql.execute("insert into car values('2020-05-01 12:00:00', 1)") + + tdSql.query("select count(*) from car interval(1n)") + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, 1) + tdSql.checkData(2, 1, 1) + tdSql.checkData(3, 1, 6) + tdSql.checkData(4, 1, 3) + tdSql.checkData(5, 1, 4) + tdSql.checkData(6, 1, 1) + + tdSql.query("select count(*) from car interval(1n) order by ts desc") + tdSql.checkData(6, 1, 1) + tdSql.checkData(5, 1, 1) + tdSql.checkData(4, 1, 1) + tdSql.checkData(3, 1, 6) + tdSql.checkData(2, 1, 3) + tdSql.checkData(1, 1, 4) + tdSql.checkData(0, 1, 1) + + tdSql.query("select count(*) from car interval(2n)") + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, 1) + tdSql.checkData(2, 1, 1) + tdSql.checkData(3, 1, 9) + tdSql.checkData(4, 1, 4) + tdSql.checkData(5, 1, 1) + + tdSql.query("select count(*) from car interval(2n) order by ts desc") + tdSql.checkData(5, 1, 1) + tdSql.checkData(4, 1, 1) + tdSql.checkData(3, 1, 1) + tdSql.checkData(2, 1, 9) + tdSql.checkData(1, 1, 4) + tdSql.checkData(0, 1, 1) + + tdSql.query("select count(*) from car interval(1y)") + tdSql.checkData(0, 1, 3) + tdSql.checkData(1, 1, 14) + + tdSql.query("select count(*) from car interval(2y)") + tdSql.checkData(0, 1, 3) + tdSql.checkData(1, 1, 14) + + + def superTable(self): + tdSql.execute("create table cars(ts timestamp, s int) tags(id int)") + tdSql.execute("create table car0 using cars tags(0)") + tdSql.execute("create table car1 using cars tags(0)") + tdSql.execute("create table car2 using cars tags(0)") + tdSql.execute("create table car3 using cars tags(0)") + tdSql.execute("create table car4 using cars tags(0)") + + tdSql.execute("insert into car0 values('2019-01-01 00:00:00', 1)") + tdSql.execute("insert into car1 values('2019-05-13 12:00:00', 1)") + tdSql.execute("insert into car2 values('2019-12-31 23:59:59', 1)") + tdSql.execute("insert into car1 values('2020-01-01 12:00:00', 1)") + tdSql.execute("insert into car1 values('2020-01-02 12:00:00', 1)") + tdSql.execute("insert into car1 values('2020-01-03 12:00:00', 1)") + tdSql.execute("insert into car1 values('2020-01-04 12:00:00', 1)") + tdSql.execute("insert into car1 values('2020-01-05 12:00:00', 1)") + tdSql.execute("insert into car1 values('2020-01-31 12:00:00', 1)") + tdSql.execute("insert into car1 values('2020-02-01 12:00:00', 1)") + tdSql.execute("insert into car2 values('2020-02-02 12:00:00', 1)") + tdSql.execute("insert into car2 values('2020-02-29 12:00:00', 1)") + tdSql.execute("insert into car3 values('2020-03-01 12:00:00', 1)") + tdSql.execute("insert into car3 values('2020-03-02 12:00:00', 1)") + tdSql.execute("insert into car3 values('2020-03-15 12:00:00', 1)") + tdSql.execute("insert into car4 values('2020-03-31 12:00:00', 1)") + tdSql.execute("insert into car3 values('2020-05-01 12:00:00', 1)") + + tdSql.query("select count(*) from cars interval(1n)") + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, 1) + tdSql.checkData(2, 1, 1) + tdSql.checkData(3, 1, 6) + tdSql.checkData(4, 1, 3) + tdSql.checkData(5, 1, 4) + tdSql.checkData(6, 1, 1) + + tdSql.query("select count(*) from cars interval(1n) order by ts desc") + tdSql.checkData(6, 1, 1) + tdSql.checkData(5, 1, 1) + tdSql.checkData(4, 1, 1) + tdSql.checkData(3, 1, 6) + tdSql.checkData(2, 1, 3) + tdSql.checkData(1, 1, 4) + tdSql.checkData(0, 1, 1) + + tdSql.query("select count(*) from cars interval(2n)") + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, 1) + tdSql.checkData(2, 1, 1) + tdSql.checkData(3, 1, 9) + tdSql.checkData(4, 1, 4) + tdSql.checkData(5, 1, 1) + + tdSql.query("select count(*) from cars interval(2n) order by ts desc") + tdSql.checkData(5, 1, 1) + tdSql.checkData(4, 1, 1) + tdSql.checkData(3, 1, 1) + tdSql.checkData(2, 1, 9) + tdSql.checkData(1, 1, 4) + tdSql.checkData(0, 1, 1) + + tdSql.query("select count(*) from cars interval(1y)") + tdSql.checkData(0, 1, 3) + tdSql.checkData(1, 1, 14) + + tdSql.query("select count(*) from cars interval(2y)") + tdSql.checkData(0, 1, 3) + tdSql.checkData(1, 1, 14) + + + def run(self): + tdSql.prepare() + self.singleTable() + self.superTable() + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) From e9673320961e5e9c92acc3b3ee158ed325111b71 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Mon, 31 Aug 2020 17:12:50 +0800 Subject: [PATCH 4/4] td-1099: natual interval in stream and bug fix --- src/client/inc/tsclient.h | 4 +- src/client/src/tscProfile.c | 4 +- src/client/src/tscStream.c | 100 +++++++++++++++++++++--------------- src/common/inc/tname.h | 2 + src/common/src/tname.c | 56 ++++++++++++++++++++ src/query/src/qExecutor.c | 37 ++----------- src/query/src/qFill.c | 26 ++++++++-- 7 files changed, 149 insertions(+), 80 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d0141835a3..900d4955e9 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -367,6 +367,8 @@ typedef struct SSqlStream { uint32_t streamId; char listed; bool isProject; + char intervalTimeUnit; + char slidingTimeUnit; int16_t precision; int64_t num; // number of computing count @@ -380,7 +382,7 @@ typedef struct SSqlStream { int64_t ctime; // stream created time int64_t stime; // stream next executed time int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed - int64_t interval; + int64_t intervalTime; int64_t slidingTime; void * pTimer; diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 6ff97e9d00..b8c3830204 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -259,11 +259,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { pSdesc->num = htobe64(pStream->num); pSdesc->useconds = htobe64(pStream->useconds); - pSdesc->stime = htobe64(pStream->stime - pStream->interval); + pSdesc->stime = htobe64(pStream->stime - pStream->intervalTime); pSdesc->ctime = htobe64(pStream->ctime); pSdesc->slidingTime = htobe64(pStream->slidingTime); - pSdesc->interval = htobe64(pStream->interval); + pSdesc->interval = htobe64(pStream->intervalTime); pHeartbeat->numOfStreams++; pSdesc++; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 9dd47888d2..ea979bfae3 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -46,22 +46,23 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) { return true; } -static int64_t tscGetRetryDelayTime(int64_t slidingTime, int16_t prec) { +static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, int16_t prec) { float retryRangeFactor = 0.3f; - - // change to ms - if (prec == TSDB_TIME_PRECISION_MICRO) { - slidingTime = slidingTime / 1000; - } - int64_t retryDelta = (int64_t)(tsStreamCompRetryDelay * retryRangeFactor); retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L; - if (slidingTime < retryDelta) { - return slidingTime; - } else { - return retryDelta; + if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') { + // change to ms + if (prec == TSDB_TIME_PRECISION_MICRO) { + slidingTime = slidingTime / 1000; + } + + if (slidingTime < retryDelta) { + return slidingTime; + } } + + return retryDelta; } static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { @@ -86,7 +87,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { // failed to get meter/metric meta, retry in 10sec. if (code != TSDB_CODE_SUCCESS) { - int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); + int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision); tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime); tscSetRetryTimer(pStream, pSql, retryDelayTime); @@ -131,13 +132,17 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { } if (etime > pStream->etime) { etime = pStream->etime; + } else if (pStream->intervalTimeUnit != 'y' && pStream->intervalTimeUnit != 'n') { + etime = pStream->stime + (etime - pStream->stime) / pStream->intervalTime * pStream->intervalTime; } else { - etime = pStream->stime + (etime - pStream->stime) / pStream->interval * pStream->interval; + etime = taosGetIntervalStartTimestamp(etime, pStream->slidingTime, pStream->intervalTime, pStream->slidingTimeUnit, pStream->precision); } pQueryInfo->window.ekey = etime; if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) { int64_t timer = pStream->slidingTime; - if (pStream->precision == TSDB_TIME_PRECISION_MICRO) { + if (pStream->intervalTimeUnit == 'y' || pStream->intervalTimeUnit == 'n') { + timer = 86400 * 1000l; + } else if (pStream->precision == TSDB_TIME_PRECISION_MICRO) { timer /= 1000l; } tscSetRetryTimer(pStream, pSql, timer); @@ -157,7 +162,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) { SSqlStream *pStream = (SSqlStream *)param; if (tres == NULL || numOfRows < 0) { - int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); + int64_t retryDelay = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision); tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows, retryDelay); @@ -218,7 +223,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf SSqlObj * pSql = (SSqlObj *)res; if (pSql == NULL || numOfRows < 0) { - int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); + int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision); tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); @@ -241,7 +246,11 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf } if (!pStream->isProject) { - pStream->stime += pStream->slidingTime; + if (pStream->intervalTimeUnit == 'y' || pStream->intervalTimeUnit == 'n') { + pStream->stime = taosAddNatualInterval(pStream->stime, pStream->slidingTime, pStream->slidingTimeUnit, pStream->precision); + } else { + pStream->stime += pStream->slidingTime; + } } // actually only one row is returned. this following is not necessary taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream); @@ -301,7 +310,7 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) now + timer, timer, delay, pStream->stime, etime); } else { tscDebug("%p stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql, pStream, - pStream->stime, timer, delay, pStream->stime - pStream->interval, pStream->stime - 1); + pStream->stime, timer, delay, pStream->stime - pStream->intervalTime, pStream->stime - 1); } pSql->cmd.command = TSDB_SQL_SELECT; @@ -311,23 +320,26 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) } static int64_t getLaunchTimeDelay(const SSqlStream* pStream) { - int64_t delayDelta = (int64_t)(pStream->slidingTime * tsStreamComputDelayRatio); - int64_t maxDelay = (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay; - if (delayDelta > maxDelay) { - delayDelta = maxDelay; - } - - int64_t remainTimeWindow = pStream->slidingTime - delayDelta; - if (maxDelay > remainTimeWindow) { - maxDelay = (int64_t)(remainTimeWindow / 1.5f); + int64_t delayDelta = maxDelay; + if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') { + delayDelta = pStream->slidingTime * tsStreamComputDelayRatio; + if (delayDelta > maxDelay) { + delayDelta = maxDelay; + } + int64_t remainTimeWindow = pStream->slidingTime - delayDelta; + if (maxDelay > remainTimeWindow) { + maxDelay = (int64_t)(remainTimeWindow / 1.5f); + } } int64_t currentDelay = (rand() % maxDelay); // a random number currentDelay += delayDelta; - assert(currentDelay < pStream->slidingTime); + if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') { + assert(currentDelay < pStream->slidingTime); + } return currentDelay; } @@ -354,7 +366,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { return; } } else { - if ((pStream->stime - pStream->interval) >= pStream->etime) { + int64_t stime = taosGetIntervalStartTimestamp(pStream->stime - 1, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); + if (stime >= pStream->etime) { tscDebug("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream, pStream->stime, pStream->etime); // TODO : How to terminate stream here @@ -387,24 +400,24 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - if (pQueryInfo->intervalTime < minIntervalTime) { + if (pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y' && pQueryInfo->intervalTime < minIntervalTime) { tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%" PRId64, pSql, pStream, pQueryInfo->intervalTime, minIntervalTime); pQueryInfo->intervalTime = minIntervalTime; } - pStream->interval = pQueryInfo->intervalTime; // it shall be derived from sql string + pStream->intervalTimeUnit = pQueryInfo->intervalTimeUnit; + pStream->intervalTime = pQueryInfo->intervalTime; // it shall be derived from sql string - if (pQueryInfo->slidingTime == 0) { + if (pQueryInfo->slidingTime <= 0) { pQueryInfo->slidingTime = pQueryInfo->intervalTime; + pQueryInfo->slidingTimeUnit = pQueryInfo->intervalTimeUnit; } int64_t minSlidingTime = (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime; - if (pQueryInfo->slidingTime == -1) { - pQueryInfo->slidingTime = pQueryInfo->intervalTime; - } else if (pQueryInfo->slidingTime < minSlidingTime) { + if (pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y' && pQueryInfo->slidingTime < minSlidingTime) { tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream, pQueryInfo->slidingTime, minSlidingTime); @@ -418,6 +431,7 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { pQueryInfo->slidingTime = pQueryInfo->intervalTime; } + pStream->slidingTimeUnit = pQueryInfo->slidingTimeUnit; pStream->slidingTime = pQueryInfo->slidingTime; if (pStream->isProject) { @@ -431,7 +445,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in if (pStream->isProject) { // no data in table, flush all data till now to destination meter, 10sec delay - pStream->interval = tsProjectExecInterval; + pStream->intervalTime = tsProjectExecInterval; pStream->slidingTime = tsProjectExecInterval; if (stime != 0) { // first projection start from the latest event timestamp @@ -442,11 +456,15 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in } } else { // timewindow based aggregation stream if (stime == 0) { // no data in meter till now - stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval; - stime -= pStream->interval; - tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime); + stime = pQueryInfo->window.skey; + if (stime == INT64_MIN) { + stime = (int64_t)taosGetTimestamp(pStream->precision); + stime = taosGetIntervalStartTimestamp(stime, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); + stime = taosGetIntervalStartTimestamp(stime - 1, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); + tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime); + } } else { - int64_t newStime = (stime / pStream->interval) * pStream->interval; + int64_t newStime = taosGetIntervalStartTimestamp(stime, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision); if (newStime != stime) { tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime); stime = newStime; @@ -516,7 +534,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer); tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, - pStream, pTableMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, pSql->sqlstr); + pStream, pTableMetaInfo->name, pStream->intervalTime, pStream->slidingTime, starttime, pSql->sqlstr); } TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), diff --git a/src/common/inc/tname.h b/src/common/inc/tname.h index 2a4ac3fc40..beef9ff375 100644 --- a/src/common/inc/tname.h +++ b/src/common/inc/tname.h @@ -35,6 +35,8 @@ bool tscValidateTableNameLength(size_t len); SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters); +int64_t taosAddNatualInterval(int64_t key, int64_t intervalTime, char timeUnit, int16_t precision); +int32_t taosCountNatualInterval(int64_t skey, int64_t ekey, int64_t intervalTime, char timeUnit, int16_t precision); int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision); #endif // TDENGINE_NAME_H diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 0c89d26bed..248c996999 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -100,6 +100,62 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO return pFilter; } +int64_t taosAddNatualInterval(int64_t key, int64_t intervalTime, char timeUnit, int16_t precision) { + key /= 1000; + if (precision == TSDB_TIME_PRECISION_MICRO) { + key /= 1000; + } + + struct tm tm; + time_t t = (time_t)key; + localtime_r(&t, &tm); + + if (timeUnit == 'y') { + intervalTime *= 12; + } + + int mon = tm.tm_year * 12 + tm.tm_mon + intervalTime; + tm.tm_year = mon / 12; + tm.tm_mon = mon % 12; + + key = mktime(&tm) * 1000L; + + if (precision == TSDB_TIME_PRECISION_MICRO) { + key *= 1000L; + } + + return key; +} + +int32_t taosCountNatualInterval(int64_t skey, int64_t ekey, int64_t intervalTime, char timeUnit, int16_t precision) { + skey /= 1000; + ekey /= 1000; + if (precision == TSDB_TIME_PRECISION_MICRO) { + skey /= 1000; + ekey /= 1000; + } + if (ekey < skey) { + int64_t tmp = ekey; + ekey = skey; + skey = tmp; + } + + struct tm tm; + time_t t = (time_t)skey; + localtime_r(&t, &tm); + int smon = tm.tm_year * 12 + tm.tm_mon; + + t = (time_t)ekey; + localtime_r(&t, &tm); + int emon = tm.tm_year * 12 + tm.tm_mon; + + if (timeUnit == 'y') { + intervalTime *= 12; + } + + return (emon - smon) / (int32_t)intervalTime; +} + int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) { if (slidingTime == 0) { return startTime; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 79a4243537..e32991d504 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -137,33 +137,6 @@ static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv); #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0) -static int64_t addNatualInterval(int64_t key, int64_t intervalTime, char intervalTimeUnit, int precision) { - key /= 1000; - if (precision == TSDB_TIME_PRECISION_MICRO) { - key /= 1000; - } - - struct tm tm; - time_t t = (time_t)key; - localtime_r(&t, &tm); - - if (intervalTimeUnit == 'y') { - intervalTime *= 12; - } - - int mon = tm.tm_year * 12 + tm.tm_mon + intervalTime; - tm.tm_year = mon / 12; - tm.tm_mon = mon % 12; - - key = mktime(&tm) * 1000L; - - if (precision == TSDB_TIME_PRECISION_MICRO) { - key *= 1000L; - } - - return key; -} - static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (pQuery->intervalTimeUnit != 'n' && pQuery->intervalTimeUnit != 'y') { @@ -528,7 +501,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value w.skey = pWindowResInfo->prevSKey; if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { - w.ekey = addNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + w.ekey = taosAddNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; } else { w.ekey = w.skey + pQuery->intervalTime - 1; } @@ -541,7 +514,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t if (w.skey > ts || w.ekey < ts) { if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { w.skey = taosGetIntervalStartTimestamp(ts, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); - w.ekey = addNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + w.ekey = taosAddNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; } else { int64_t st = w.skey; @@ -883,7 +856,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow TSKEY next = primaryKeys[startPos]; if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); - pNext->ekey = addNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + pNext->ekey = taosAddNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; } else { pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime; pNext->skey = pNext->ekey - pQuery->intervalTime + 1; @@ -892,7 +865,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow TSKEY next = primaryKeys[startPos]; if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision); - pNext->ekey = addNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + pNext->ekey = taosAddNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; } else { pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime; pNext->ekey = pNext->skey + pQuery->intervalTime - 1; @@ -1880,7 +1853,7 @@ void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int6 assert(keyLast - keyFirst < pQuery->intervalTime); win->ekey = INT64_MAX; } else if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') { - win->ekey = addNatualInterval(win->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; + win->ekey = taosAddNatualInterval(win->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1; } else { win->ekey = win->skey + pQuery->intervalTime - 1; } diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index c1cfab3ea2..b3bb443fd8 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -179,14 +179,22 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set. TSKEY lastKey = tsList[pFillInfo->numOfRows - 1]; - numOfRes = (int64_t)(ABS(lastKey - pFillInfo->start) / pFillInfo->slidingTime) + 1; + if (pFillInfo->slidingUnit != 'y' && pFillInfo->slidingUnit != 'n') { + numOfRes = (int64_t)(ABS(lastKey - pFillInfo->start) / pFillInfo->slidingTime) + 1; + } else { + numOfRes = taosCountNatualInterval(lastKey, pFillInfo->start, pFillInfo->slidingTime, pFillInfo->slidingUnit, pFillInfo->precision) + 1; + } assert(numOfRes >= numOfRows); } else { // reach the end of data if ((ekey1 < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) || (ekey1 > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) { return 0; - } else { // the numOfRes rows are all filled with specified policy + } + // the numOfRes rows are all filled with specified policy + if (pFillInfo->slidingUnit != 'y' && pFillInfo->slidingUnit != 'n') { numOfRes = (ABS(ekey1 - pFillInfo->start) / pFillInfo->slidingTime) + 1; + } else { + numOfRes = taosCountNatualInterval(ekey1, pFillInfo->start, pFillInfo->slidingTime, pFillInfo->slidingUnit, pFillInfo->precision) + 1; } } @@ -366,7 +374,12 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu setTagsValue(pFillInfo, data, *num); } - pFillInfo->start += (pFillInfo->slidingTime * step); +// TODO natual sliding time + if (pFillInfo->slidingUnit != 'n' && pFillInfo->slidingUnit != 'y') { + pFillInfo->start += (pFillInfo->slidingTime * step); + } else { + pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->slidingTime*step, pFillInfo->slidingUnit, pFillInfo->precision); + } pFillInfo->numOfCurrent++; (*num) += 1; @@ -473,7 +486,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu // set the tag value for final result setTagsValue(pFillInfo, data, num); - pFillInfo->start += (pFillInfo->slidingTime * step); + // TODO natual sliding time + if (pFillInfo->slidingUnit != 'n' && pFillInfo->slidingUnit != 'y') { + pFillInfo->start += (pFillInfo->slidingTime * step); + } else { + pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->slidingTime*step, pFillInfo->slidingUnit, pFillInfo->precision); + } pFillInfo->rowIdx += 1; pFillInfo->numOfCurrent +=1;