td-1099: natual month/year processing
This commit is contained in:
parent
fb19257a99
commit
b3840db93e
|
@ -70,6 +70,8 @@ typedef struct SJoinSupporter {
|
|||
SSubqueryState* pState;
|
||||
SSqlObj* pObj; // parent SqlObj
|
||||
int32_t subqueryIndex; // index of sub query
|
||||
char intervalTimeUnit;
|
||||
char slidingTimeUnit;
|
||||
int64_t intervalTime; // interval time
|
||||
int64_t slidingTime; // sliding time
|
||||
SLimitVal limit; // limit info
|
||||
|
|
|
@ -4681,7 +4681,9 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
|||
const char* msg0 = "sample interval can not be less than 10ms.";
|
||||
const char* msg1 = "functions not allowed in select clause";
|
||||
|
||||
if (pQueryInfo->intervalTime != 0 && pQueryInfo->intervalTime < 10) {
|
||||
if (pQueryInfo->intervalTime != 0 && pQueryInfo->intervalTime < 10 &&
|
||||
pQueryInfo->intervalTimeUnit != 'n' &&
|
||||
pQueryInfo->intervalTimeUnit != 'y') {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0);
|
||||
}
|
||||
|
||||
|
@ -6167,7 +6169,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
|||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
}
|
||||
|
||||
if (pQueryInfo->intervalTime > 0) {
|
||||
if (pQueryInfo->intervalTime > 0 && pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y') {
|
||||
int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey);
|
||||
// number of result is not greater than 10,000,000
|
||||
if ((timeRange == 0) || (timeRange / pQueryInfo->intervalTime) > MAX_INTERVAL_TIME_WINDOW) {
|
||||
|
|
|
@ -673,6 +673,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pQueryMsg->numOfCols = htons((int16_t)taosArrayGetSize(pQueryInfo->colList));
|
||||
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
|
||||
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
|
||||
pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
|
||||
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
|
||||
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
|
||||
pQueryMsg->numOfTags = htonl(numOfTags);
|
||||
|
|
|
@ -178,6 +178,8 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
|
|||
pSupporter->subqueryIndex = index;
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
|
||||
|
||||
pSupporter->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
|
||||
pSupporter->slidingTime = pQueryInfo->slidingTimeUnit;
|
||||
pSupporter->intervalTime = pQueryInfo->intervalTime;
|
||||
pSupporter->slidingTime = pQueryInfo->slidingTime;
|
||||
pSupporter->limit = pQueryInfo->limit;
|
||||
|
@ -309,6 +311,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
|
|||
// set the second stage sub query for join process
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
|
||||
|
||||
pQueryInfo->intervalTimeUnit = pSupporter->intervalTimeUnit;
|
||||
pQueryInfo->slidingTimeUnit = pSupporter->slidingTimeUnit;
|
||||
pQueryInfo->intervalTime = pSupporter->intervalTime;
|
||||
pQueryInfo->slidingTime = pSupporter->slidingTime;
|
||||
pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
|
||||
|
|
|
@ -1830,6 +1830,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
pNewQueryInfo->command = pQueryInfo->command;
|
||||
pNewQueryInfo->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
|
||||
pNewQueryInfo->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
|
||||
pNewQueryInfo->intervalTime = pQueryInfo->intervalTime;
|
||||
pNewQueryInfo->slidingTime = pQueryInfo->slidingTime;
|
||||
|
|
|
@ -104,29 +104,60 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
|
|||
if (slidingTime == 0) {
|
||||
return startTime;
|
||||
}
|
||||
int64_t start = startTime;
|
||||
if (timeUnit == 'n' || timeUnit == 'y') {
|
||||
start /= 1000;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
start /= 1000;
|
||||
}
|
||||
struct tm tm;
|
||||
time_t t = (time_t)start;
|
||||
localtime_r(&t, &tm);
|
||||
tm.tm_sec = 0;
|
||||
tm.tm_min = 0;
|
||||
tm.tm_hour = 0;
|
||||
tm.tm_mday = 1;
|
||||
|
||||
int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime;
|
||||
if (!(timeUnit == 'u' || timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) {
|
||||
/*
|
||||
* here we revised the start time of day according to the local time zone,
|
||||
* but in case of DST, the start time of one day need to be dynamically decided.
|
||||
*/
|
||||
// todo refactor to extract function that is available for Linux/Windows/Mac platform
|
||||
#if defined(WINDOWS) && _MSC_VER >= 1900
|
||||
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
|
||||
int64_t timezone = _timezone;
|
||||
int32_t daylight = _daylight;
|
||||
char** tzname = _tzname;
|
||||
#endif
|
||||
if (timeUnit == 'y') {
|
||||
tm.tm_mon = 0;
|
||||
tm.tm_year = tm.tm_year / slidingTime * slidingTime;
|
||||
} else {
|
||||
int mon = tm.tm_year * 12 + tm.tm_mon;
|
||||
mon = mon / slidingTime * slidingTime;
|
||||
tm.tm_year = mon / 12;
|
||||
tm.tm_mon = mon % 12;
|
||||
}
|
||||
|
||||
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
|
||||
start += timezone * t;
|
||||
start = mktime(&tm) * 1000L;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
start *= 1000L;
|
||||
}
|
||||
} else {
|
||||
start = ((start - intervalTime) / slidingTime + 1) * slidingTime;
|
||||
|
||||
if (timeUnit == 'd' || timeUnit == 'w') {
|
||||
/*
|
||||
* here we revised the start time of day according to the local time zone,
|
||||
* but in case of DST, the start time of one day need to be dynamically decided.
|
||||
*/
|
||||
// todo refactor to extract function that is available for Linux/Windows/Mac platform
|
||||
#if defined(WINDOWS) && _MSC_VER >= 1900
|
||||
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
|
||||
int64_t timezone = _timezone;
|
||||
int32_t daylight = _daylight;
|
||||
char** tzname = _tzname;
|
||||
#endif
|
||||
|
||||
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
|
||||
start += timezone * t;
|
||||
}
|
||||
|
||||
int64_t end = start + intervalTime - 1;
|
||||
if (end < startTime) {
|
||||
start += slidingTime;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t end = start + intervalTime - 1;
|
||||
if (end < startTime) {
|
||||
start += slidingTime;
|
||||
}
|
||||
return start;
|
||||
}
|
||||
|
||||
|
|
|
@ -456,6 +456,7 @@ typedef struct {
|
|||
int64_t intervalTime; // time interval for aggregation, in million second
|
||||
int64_t intervalOffset; // start offset for interval query
|
||||
int64_t slidingTime; // value for sliding window
|
||||
char intervalTimeUnit;
|
||||
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
|
||||
uint16_t tagCondLen; // tag length in current query
|
||||
int16_t numOfGroupCols; // num of group by columns
|
||||
|
|
|
@ -132,11 +132,12 @@ typedef struct SQueryCostInfo {
|
|||
typedef struct SQuery {
|
||||
int16_t numOfCols;
|
||||
int16_t numOfTags;
|
||||
char intervalTimeUnit;
|
||||
char slidingTimeUnit; // interval data type, used for daytime revise
|
||||
SOrderVal order;
|
||||
STimeWindow window;
|
||||
int64_t intervalTime;
|
||||
int64_t slidingTime; // sliding time for sliding window query
|
||||
char slidingTimeUnit; // interval data type, used for daytime revise
|
||||
int16_t precision;
|
||||
int16_t numOfOutput;
|
||||
int16_t fillType;
|
||||
|
|
|
@ -137,13 +137,75 @@ static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
|
|||
|
||||
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
|
||||
|
||||
// previous time window may not be of the same size of pQuery->intervalTime
|
||||
#define GET_NEXT_TIMEWINDOW(_q, tw) \
|
||||
do { \
|
||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR((_q)->order.order); \
|
||||
(tw)->skey += ((_q)->slidingTime * factor); \
|
||||
(tw)->ekey = (tw)->skey + ((_q)->intervalTime - 1); \
|
||||
} while (0)
|
||||
static int64_t addNatualInterval(int64_t key, int64_t intervalTime, char intervalTimeUnit, int precision) {
|
||||
key /= 1000;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
key /= 1000;
|
||||
}
|
||||
|
||||
struct tm tm;
|
||||
time_t t = (time_t)key;
|
||||
localtime_r(&t, &tm);
|
||||
|
||||
if (intervalTimeUnit == 'y') {
|
||||
intervalTime *= 12;
|
||||
}
|
||||
|
||||
int mon = tm.tm_year * 12 + tm.tm_mon + intervalTime;
|
||||
tm.tm_year = mon / 12;
|
||||
tm.tm_mon = mon % 12;
|
||||
|
||||
key = mktime(&tm) * 1000L;
|
||||
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
key *= 1000L;
|
||||
}
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
|
||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
if (pQuery->intervalTimeUnit != 'n' && pQuery->intervalTimeUnit != 'y') {
|
||||
tw->skey += pQuery->slidingTime * factor;
|
||||
tw->ekey = tw->skey + pQuery->intervalTime - 1;
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t key = tw->skey;
|
||||
key /= 1000;
|
||||
if (pQuery->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
key /= 1000;
|
||||
}
|
||||
|
||||
struct tm tm;
|
||||
time_t t = (time_t)key;
|
||||
localtime_r(&t, &tm);
|
||||
|
||||
if (pQuery->intervalTimeUnit == 'y') {
|
||||
factor *= 12;
|
||||
}
|
||||
|
||||
int mon = tm.tm_year * 12 + tm.tm_mon;
|
||||
mon += pQuery->intervalTime * factor;
|
||||
tm.tm_year = mon / 12;
|
||||
tm.tm_mon = mon % 12;
|
||||
|
||||
tw->skey = mktime(&tm) * 1000L;
|
||||
|
||||
mon += pQuery->intervalTime * factor;
|
||||
tm.tm_year = mon / 12;
|
||||
tm.tm_mon = mon % 12;
|
||||
tw->ekey = mktime(&tm) * 1000L;
|
||||
|
||||
if (pQuery->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
tw->skey *= 1000L;
|
||||
tw->ekey *= 1000L;
|
||||
}
|
||||
tw->ekey -= 1;
|
||||
}
|
||||
|
||||
#define GET_NEXT_TIMEWINDOW(_q, tw) getNextTimeWindow((_q), (tw))
|
||||
|
||||
#define SET_STABLE_QUERY_OVER(_q) ((_q)->tableIndex = (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
|
||||
#define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
|
||||
|
@ -467,9 +529,13 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
|
|||
static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) {
|
||||
STimeWindow w = {0};
|
||||
|
||||
if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value
|
||||
if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value
|
||||
w.skey = pWindowResInfo->prevSKey;
|
||||
w.ekey = w.skey + pQuery->intervalTime - 1;
|
||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||
w.ekey = addNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||
} else {
|
||||
w.ekey = w.skey + pQuery->intervalTime - 1;
|
||||
}
|
||||
} else {
|
||||
int32_t slot = curTimeWindowIndex(pWindowResInfo);
|
||||
SWindowResult* pWindowRes = getWindowResult(pWindowResInfo, slot);
|
||||
|
@ -477,19 +543,24 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
|
|||
}
|
||||
|
||||
if (w.skey > ts || w.ekey < ts) {
|
||||
int64_t st = w.skey;
|
||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||
w.skey = taosGetIntervalStartTimestamp(ts, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
||||
w.ekey = addNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||
} else {
|
||||
int64_t st = w.skey;
|
||||
|
||||
if (st > ts) {
|
||||
st -= ((st - ts + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
|
||||
if (st > ts) {
|
||||
st -= ((st - ts + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
|
||||
}
|
||||
|
||||
int64_t et = st + pQuery->intervalTime - 1;
|
||||
if (et < ts) {
|
||||
st += ((ts - et + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
|
||||
}
|
||||
|
||||
w.skey = st;
|
||||
w.ekey = w.skey + pQuery->intervalTime - 1;
|
||||
}
|
||||
|
||||
int64_t et = st + pQuery->intervalTime - 1;
|
||||
if (et < ts) {
|
||||
st += ((ts - et + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
|
||||
}
|
||||
|
||||
w.skey = st;
|
||||
w.ekey = w.skey + pQuery->intervalTime - 1;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -814,14 +885,22 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
|
|||
*/
|
||||
if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNext->ekey) {
|
||||
TSKEY next = primaryKeys[startPos];
|
||||
|
||||
pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
|
||||
pNext->skey = pNext->ekey - pQuery->intervalTime + 1;
|
||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||
pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
||||
pNext->ekey = addNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||
} else {
|
||||
pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
|
||||
pNext->skey = pNext->ekey - pQuery->intervalTime + 1;
|
||||
}
|
||||
} else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNext->skey) {
|
||||
TSKEY next = primaryKeys[startPos];
|
||||
|
||||
pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
|
||||
pNext->ekey = pNext->skey + pQuery->intervalTime - 1;
|
||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||
pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
||||
pNext->ekey = addNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||
} else {
|
||||
pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
|
||||
pNext->ekey = pNext->skey + pQuery->intervalTime - 1;
|
||||
}
|
||||
}
|
||||
|
||||
return startPos;
|
||||
|
@ -1804,7 +1883,8 @@ void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int6
|
|||
if (keyFirst > (INT64_MAX - pQuery->intervalTime)) {
|
||||
assert(keyLast - keyFirst < pQuery->intervalTime);
|
||||
win->ekey = INT64_MAX;
|
||||
return;
|
||||
} else if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||
win->ekey = addNatualInterval(win->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||
} else {
|
||||
win->ekey = win->skey + pQuery->intervalTime - 1;
|
||||
}
|
||||
|
@ -6016,6 +6096,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
|||
pQuery->pGroupbyExpr = pGroupbyExpr;
|
||||
pQuery->intervalTime = pQueryMsg->intervalTime;
|
||||
pQuery->slidingTime = pQueryMsg->slidingTime;
|
||||
pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit;
|
||||
pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit;
|
||||
pQuery->fillType = pQueryMsg->fillType;
|
||||
pQuery->numOfTags = pQueryMsg->numOfTags;
|
||||
|
|
Loading…
Reference in New Issue