fix(query): do some internal refactor, and fix the bug when calender time duration exists in sliding and offset.
This commit is contained in:
parent
31c0ee3273
commit
0d9d2ea293
|
@ -74,7 +74,7 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) {
|
||||||
|
|
||||||
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
|
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
|
||||||
|
|
||||||
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
|
int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval);
|
||||||
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
|
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
|
||||||
|
|
||||||
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
|
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
|
||||||
|
|
|
@ -178,7 +178,9 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
|
||||||
|
|
||||||
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);
|
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);
|
||||||
|
|
||||||
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key);
|
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
|
||||||
|
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
|
||||||
|
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key);
|
||||||
/**
|
/**
|
||||||
* return the scan info, in the form of tuple of two items, including table uid and current timestamp
|
* return the scan info, in the form of tuple of two items, including table uid and current timestamp
|
||||||
* @param tinfo
|
* @param tinfo
|
||||||
|
|
|
@ -82,6 +82,7 @@ static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_
|
||||||
static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim);
|
static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim);
|
||||||
static char* forwardToTimeStringEnd(char* str);
|
static char* forwardToTimeStringEnd(char* str);
|
||||||
static bool checkTzPresent(const char* str, int32_t len);
|
static bool checkTzPresent(const char* str, int32_t len);
|
||||||
|
static int32_t parseTimezone(char* str, int64_t* tzOffset);
|
||||||
|
|
||||||
static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) = {
|
static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) = {
|
||||||
parseLocaltime, parseLocaltimeDst};
|
parseLocaltime, parseLocaltimeDst};
|
||||||
|
@ -708,21 +709,19 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati
|
||||||
return getDuration(*duration, *unit, duration, timePrecision);
|
return getDuration(*duration, *unit, duration, timePrecision);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define IS_CALENDAR_TIME_DURATION(_d) (((_d) == 'n') || ((_d) == 'y'))
|
||||||
|
|
||||||
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
|
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
|
||||||
if (duration == 0) {
|
if (duration == 0) {
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (unit != 'n' && unit != 'y') {
|
if (!IS_CALENDAR_TIME_DURATION(unit)) {
|
||||||
return t + duration;
|
return t + duration;
|
||||||
}
|
}
|
||||||
|
|
||||||
// The following code handles the y/n time duration
|
// The following code handles the y/n time duration
|
||||||
int64_t numOfMonth = duration;
|
int64_t numOfMonth = (unit == 'y')? duration*12:duration;
|
||||||
if (unit == 'y') {
|
|
||||||
numOfMonth *= 12;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t fraction = t % TSDB_TICK_PER_SECOND(precision);
|
int64_t fraction = t % TSDB_TICK_PER_SECOND(precision);
|
||||||
|
|
||||||
struct tm tm;
|
struct tm tm;
|
||||||
|
@ -764,13 +763,16 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char
|
||||||
return (emon - smon) / (int32_t)interval;
|
return (emon - smon) / (int32_t)interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision) {
|
int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) {
|
||||||
if (pInterval->sliding == 0 && pInterval->interval == 0) {
|
if (pInterval->sliding == 0 && pInterval->interval == 0) {
|
||||||
return t;
|
return ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t start = t;
|
int64_t start = ts;
|
||||||
if (pInterval->slidingUnit == 'n' || pInterval->slidingUnit == 'y') {
|
int32_t precision = pInterval->precision;
|
||||||
|
|
||||||
|
if (IS_CALENDAR_TIME_DURATION(pInterval->slidingUnit)) {
|
||||||
|
|
||||||
start /= (int64_t)(TSDB_TICK_PER_SECOND(precision));
|
start /= (int64_t)(TSDB_TICK_PER_SECOND(precision));
|
||||||
struct tm tm;
|
struct tm tm;
|
||||||
time_t tt = (time_t)start;
|
time_t tt = (time_t)start;
|
||||||
|
@ -792,44 +794,72 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
|
||||||
|
|
||||||
start = (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision));
|
start = (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision));
|
||||||
} else {
|
} else {
|
||||||
int64_t delta = t - pInterval->interval;
|
if (IS_CALENDAR_TIME_DURATION(pInterval->intervalUnit)) {
|
||||||
int32_t factor = (delta >= 0) ? 1 : -1;
|
int64_t news = (ts / pInterval->sliding) * pInterval->sliding;
|
||||||
|
ASSERT(news <= ts);
|
||||||
|
|
||||||
start = (delta / pInterval->sliding + factor) * pInterval->sliding;
|
if (news <= ts) {
|
||||||
|
int64_t prev = news;
|
||||||
|
int64_t newe = taosTimeAdd(news, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||||
|
|
||||||
if (pInterval->intervalUnit == 'd' || pInterval->intervalUnit == 'w') {
|
if (newe < ts) { // move towards the greater endpoint
|
||||||
/*
|
while(newe < ts && news < ts) {
|
||||||
* here we revised the start time of day according to the local time zone,
|
news += pInterval->sliding;
|
||||||
* but in case of DST, the start time of one day need to be dynamically decided.
|
newe = taosTimeAdd(news, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||||
*/
|
}
|
||||||
// todo refactor to extract function that is available for Linux/Windows/Mac platform
|
|
||||||
#if defined(WINDOWS) && _MSC_VER >= 1900
|
|
||||||
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
|
|
||||||
int64_t timezone = _timezone;
|
|
||||||
int32_t daylight = _daylight;
|
|
||||||
char** tzname = _tzname;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
start += (int64_t)(timezone * TSDB_TICK_PER_SECOND(precision));
|
prev = news;
|
||||||
}
|
|
||||||
|
|
||||||
int64_t end = 0;
|
|
||||||
|
|
||||||
// not enough time range
|
|
||||||
if (start < 0 || INT64_MAX - start > pInterval->interval - 1) {
|
|
||||||
end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
|
||||||
while (end < t) { // move forward to the correct time window
|
|
||||||
start += pInterval->sliding;
|
|
||||||
|
|
||||||
if (start < 0 || INT64_MAX - start > pInterval->interval - 1) {
|
|
||||||
end = start + pInterval->interval - 1;
|
|
||||||
} else {
|
} else {
|
||||||
end = INT64_MAX;
|
while (newe >= ts) {
|
||||||
break;
|
prev = news;
|
||||||
|
news -= pInterval->sliding;
|
||||||
|
newe = taosTimeAdd(news, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return prev;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
end = INT64_MAX;
|
int64_t delta = ts - pInterval->interval;
|
||||||
|
int32_t factor = (delta >= 0) ? 1 : -1;
|
||||||
|
|
||||||
|
start = (delta / pInterval->sliding + factor) * pInterval->sliding;
|
||||||
|
|
||||||
|
if (pInterval->intervalUnit == 'd' || pInterval->intervalUnit == 'w') {
|
||||||
|
/*
|
||||||
|
* here we revised the start time of day according to the local time zone,
|
||||||
|
* but in case of DST, the start time of one day need to be dynamically decided.
|
||||||
|
*/
|
||||||
|
// todo refactor to extract function that is available for Linux/Windows/Mac platform
|
||||||
|
#if defined(WINDOWS) && _MSC_VER >= 1900
|
||||||
|
// see
|
||||||
|
// https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
|
||||||
|
int64_t timezone = _timezone;
|
||||||
|
int32_t daylight = _daylight;
|
||||||
|
char** tzname = _tzname;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
start += (int64_t)(timezone * TSDB_TICK_PER_SECOND(precision));
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t end = 0;
|
||||||
|
|
||||||
|
// not enough time range
|
||||||
|
if (start < 0 || INT64_MAX - start > pInterval->interval - 1) {
|
||||||
|
end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||||
|
while (end < ts) { // move forward to the correct time window
|
||||||
|
start += pInterval->sliding;
|
||||||
|
|
||||||
|
if (start < 0 || INT64_MAX - start > pInterval->interval - 1) {
|
||||||
|
end = start + pInterval->interval - 1;
|
||||||
|
} else {
|
||||||
|
end = INT64_MAX;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
end = INT64_MAX;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -842,7 +872,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
|
||||||
int64_t end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
int64_t end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||||
|
|
||||||
int64_t newEnd = end;
|
int64_t newEnd = end;
|
||||||
while (newEnd >= t) {
|
while (newEnd >= ts) {
|
||||||
end = newEnd;
|
end = newEnd;
|
||||||
newEnd = taosTimeAdd(newEnd, -pInterval->sliding, pInterval->slidingUnit, precision);
|
newEnd = taosTimeAdd(newEnd, -pInterval->sliding, pInterval->slidingUnit, precision);
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,4 +164,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
|
||||||
|
|
||||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||||
|
|
||||||
|
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
|
||||||
|
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
|
||||||
|
|
||||||
#endif // TDENGINE_QUERYUTIL_H
|
#endif // TDENGINE_QUERYUTIL_H
|
||||||
|
|
|
@ -854,7 +854,6 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
|
||||||
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
|
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
|
||||||
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
|
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
|
||||||
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
|
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
|
||||||
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
|
|
||||||
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
|
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
|
||||||
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
|
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
|
||||||
int32_t order, int64_t* pData);
|
int32_t order, int64_t* pData);
|
||||||
|
|
|
@ -1686,12 +1686,12 @@ int32_t convertFillType(int32_t mode) {
|
||||||
return type;
|
return type;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
|
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
|
||||||
if (ascQuery) {
|
if (ascQuery) {
|
||||||
*w = getAlignQueryTimeWindow(pInterval, pInterval->precision, ts);
|
*w = getAlignQueryTimeWindow(pInterval, ts);
|
||||||
} else {
|
} else {
|
||||||
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
|
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
|
||||||
*w = getAlignQueryTimeWindow(pInterval, pInterval->precision, ts);
|
*w = getAlignQueryTimeWindow(pInterval, ts);
|
||||||
|
|
||||||
int64_t key = w->skey;
|
int64_t key = w->skey;
|
||||||
while (key < ts) { // moving towards end
|
while (key < ts) { // moving towards end
|
||||||
|
@ -1708,7 +1708,7 @@ static void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindo
|
||||||
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
|
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
|
||||||
STimeWindow w = {0};
|
STimeWindow w = {0};
|
||||||
|
|
||||||
w.skey = taosTimeTruncate(ts, pInterval, pInterval->precision);
|
w.skey = taosTimeTruncate(ts, pInterval);
|
||||||
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||||
return w;
|
return w;
|
||||||
}
|
}
|
||||||
|
@ -1742,6 +1742,7 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
|
||||||
if (pRow) {
|
if (pRow) {
|
||||||
w = pRow->win;
|
w = pRow->win;
|
||||||
}
|
}
|
||||||
|
|
||||||
// in case of typical time window, we can calculate time window directly.
|
// in case of typical time window, we can calculate time window directly.
|
||||||
if (w.skey > ts || w.ekey < ts) {
|
if (w.skey > ts || w.ekey < ts) {
|
||||||
w = doCalculateTimeWindow(ts, pInterval);
|
w = doCalculateTimeWindow(ts, pInterval);
|
||||||
|
@ -1756,6 +1757,34 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
|
||||||
return w;
|
return w;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
|
||||||
|
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
|
||||||
|
if (pInterval->sliding != 'n' && pInterval->sliding != 'y') {
|
||||||
|
tw->skey += pInterval->sliding * factor;
|
||||||
|
tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// convert key to second
|
||||||
|
int64_t key = convertTimePrecision(tw->skey, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;
|
||||||
|
|
||||||
|
int64_t duration = pInterval->sliding;
|
||||||
|
if (pInterval->slidingUnit == 'y') {
|
||||||
|
duration *= 12;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct tm tm;
|
||||||
|
time_t t = (time_t) key;
|
||||||
|
taosLocalTime(&t, &tm);
|
||||||
|
|
||||||
|
int mon = (int)(tm.tm_year * 12 + tm.tm_mon + duration * factor);
|
||||||
|
tm.tm_year = mon / 12;
|
||||||
|
tm.tm_mon = mon % 12;
|
||||||
|
tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
|
||||||
|
|
||||||
|
tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||||
|
}
|
||||||
|
|
||||||
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
|
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
|
||||||
return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
|
return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
|
||||||
pLimitInfo->slimit.offset != -1);
|
pLimitInfo->slimit.offset != -1);
|
||||||
|
|
|
@ -637,15 +637,15 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code) ? tr
|
||||||
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
|
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
|
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) {
|
||||||
STimeWindow win = {0};
|
STimeWindow win = {0};
|
||||||
win.skey = taosTimeTruncate(key, pInterval, precision);
|
win.skey = taosTimeTruncate(key, pInterval);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* if the realSkey > INT64_MAX - pInterval->interval, the query duration between
|
* if the realSkey > INT64_MAX - pInterval->interval, the query duration between
|
||||||
* realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
|
* realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
|
||||||
*/
|
*/
|
||||||
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||||
if (win.ekey < win.skey) {
|
if (win.ekey < win.skey) {
|
||||||
win.ekey = INT64_MAX;
|
win.ekey = INT64_MAX;
|
||||||
}
|
}
|
||||||
|
|
|
@ -255,10 +255,10 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
||||||
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
|
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
|
||||||
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
|
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
|
||||||
|
|
||||||
|
STimeWindow w = {0};
|
||||||
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
|
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
|
||||||
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
|
|
||||||
w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
|
|
||||||
|
|
||||||
|
getInitialStartTimeWindow(pInterval, startKey, &w, order);
|
||||||
pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
||||||
pInfo->primaryTsCol, order, id);
|
pInfo->primaryTsCol, order, id);
|
||||||
|
|
||||||
|
@ -400,13 +400,13 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
|
|
||||||
TSKEY getNextWindowTs(TSKEY ts, SInterval* pInterval) {
|
TSKEY getNextWindowTs(TSKEY ts, SInterval* pInterval) {
|
||||||
STimeWindow win = {.skey = ts, .ekey = ts};
|
STimeWindow win = {.skey = ts, .ekey = ts};
|
||||||
getNextIntervalWindow(pInterval, &win, TSDB_ORDER_ASC);
|
getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
|
||||||
return win.skey;
|
return win.skey;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY getPrevWindowTs(TSKEY ts, SInterval* pInterval) {
|
TSKEY getPrevWindowTs(TSKEY ts, SInterval* pInterval) {
|
||||||
STimeWindow win = {.skey = ts, .ekey = ts};
|
STimeWindow win = {.skey = ts, .ekey = ts};
|
||||||
getNextIntervalWindow(pInterval, &win, TSDB_ORDER_DESC);
|
getNextTimeWindow(pInterval, &win, TSDB_ORDER_DESC);
|
||||||
return win.skey;
|
return win.skey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,39 +67,6 @@ static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
|
|
||||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
|
|
||||||
if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
|
|
||||||
tw->skey += pInterval->sliding * factor;
|
|
||||||
tw->ekey = tw->skey + pInterval->interval - 1;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t key = tw->skey, interval = pInterval->interval;
|
|
||||||
// convert key to second
|
|
||||||
key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;
|
|
||||||
|
|
||||||
if (pInterval->intervalUnit == 'y') {
|
|
||||||
interval *= 12;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct tm tm;
|
|
||||||
time_t t = (time_t)key;
|
|
||||||
taosLocalTime(&t, &tm);
|
|
||||||
|
|
||||||
int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
|
|
||||||
tm.tm_year = mon / 12;
|
|
||||||
tm.tm_mon = mon % 12;
|
|
||||||
tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
|
|
||||||
|
|
||||||
mon = (int)(mon + interval);
|
|
||||||
tm.tm_year = mon / 12;
|
|
||||||
tm.tm_mon = mon % 12;
|
|
||||||
tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
|
|
||||||
|
|
||||||
tw->ekey -= 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
|
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
|
||||||
STimeWindow w = {0};
|
STimeWindow w = {0};
|
||||||
|
|
||||||
|
@ -109,7 +76,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
||||||
}
|
}
|
||||||
|
|
||||||
if (order == TSDB_ORDER_ASC) {
|
if (order == TSDB_ORDER_ASC) {
|
||||||
w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
|
w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.skey);
|
||||||
ASSERT(w.ekey >= pBlockInfo->window.skey);
|
ASSERT(w.ekey >= pBlockInfo->window.skey);
|
||||||
|
|
||||||
if (w.ekey < pBlockInfo->window.ekey) {
|
if (w.ekey < pBlockInfo->window.ekey) {
|
||||||
|
@ -128,7 +95,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
|
w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.ekey);
|
||||||
ASSERT(w.skey <= pBlockInfo->window.ekey);
|
ASSERT(w.skey <= pBlockInfo->window.ekey);
|
||||||
|
|
||||||
if (w.skey > pBlockInfo->window.skey) {
|
if (w.skey > pBlockInfo->window.skey) {
|
||||||
|
|
|
@ -519,7 +519,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
|
||||||
|
|
||||||
pFillInfo->end = endKey;
|
pFillInfo->end = endKey;
|
||||||
if (!FILL_IS_ASC_FILL(pFillInfo)) {
|
if (!FILL_IS_ASC_FILL(pFillInfo)) {
|
||||||
pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->interval.precision);
|
pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval);
|
||||||
}
|
}
|
||||||
|
|
||||||
pFillInfo->index = 0;
|
pFillInfo->index = 0;
|
||||||
|
|
|
@ -263,43 +263,6 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t order, STimeWindow* tw) {
|
|
||||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
|
|
||||||
if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
|
|
||||||
tw->skey += pInterval->sliding * factor;
|
|
||||||
tw->ekey = tw->skey + pInterval->interval - 1;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t key = tw->skey, interval = pInterval->interval;
|
|
||||||
// convert key to second
|
|
||||||
key = convertTimePrecision(key, precision, TSDB_TIME_PRECISION_MILLI) / 1000;
|
|
||||||
|
|
||||||
if (pInterval->intervalUnit == 'y') {
|
|
||||||
interval *= 12;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct tm tm;
|
|
||||||
time_t t = (time_t)key;
|
|
||||||
taosLocalTime(&t, &tm);
|
|
||||||
|
|
||||||
int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
|
|
||||||
tm.tm_year = mon / 12;
|
|
||||||
tm.tm_mon = mon % 12;
|
|
||||||
tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, precision);
|
|
||||||
|
|
||||||
mon = (int)(mon + interval);
|
|
||||||
tm.tm_year = mon / 12;
|
|
||||||
tm.tm_mon = mon % 12;
|
|
||||||
tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, precision);
|
|
||||||
|
|
||||||
tw->ekey -= 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
|
|
||||||
getNextTimeWindow(pInterval, pInterval->precision, order, tw);
|
|
||||||
}
|
|
||||||
|
|
||||||
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
|
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
|
||||||
int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
|
int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
|
||||||
SqlFunctionCtx* pCtx = pSup->pCtx;
|
SqlFunctionCtx* pCtx = pSup->pCtx;
|
||||||
|
@ -455,7 +418,7 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext,
|
||||||
bool ascQuery = (order == TSDB_ORDER_ASC);
|
bool ascQuery = (order == TSDB_ORDER_ASC);
|
||||||
|
|
||||||
int32_t precision = pInterval->precision;
|
int32_t precision = pInterval->precision;
|
||||||
getNextTimeWindow(pInterval, precision, order, pNext);
|
getNextTimeWindow(pInterval, pNext, order);
|
||||||
|
|
||||||
// next time window is not in current block
|
// next time window is not in current block
|
||||||
if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
|
if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
|
||||||
|
@ -500,7 +463,7 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext,
|
||||||
if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
|
if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
|
||||||
TSKEY next = primaryKeys[startPos];
|
TSKEY next = primaryKeys[startPos];
|
||||||
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
|
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
|
||||||
pNext->skey = taosTimeTruncate(next, pInterval, precision);
|
pNext->skey = taosTimeTruncate(next, pInterval);
|
||||||
pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||||
} else {
|
} else {
|
||||||
pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
||||||
|
@ -509,7 +472,7 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext,
|
||||||
} else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
|
} else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
|
||||||
TSKEY next = primaryKeys[startPos];
|
TSKEY next = primaryKeys[startPos];
|
||||||
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
|
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
|
||||||
pNext->skey = taosTimeTruncate(next, pInterval, precision);
|
pNext->skey = taosTimeTruncate(next, pInterval);
|
||||||
pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||||
} else {
|
} else {
|
||||||
pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
||||||
|
@ -1380,7 +1343,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
|
||||||
|
|
||||||
do {
|
do {
|
||||||
if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i])) {
|
if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i])) {
|
||||||
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
|
getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
uint64_t winGpId = pGpDatas[i];
|
uint64_t winGpId = pGpDatas[i];
|
||||||
|
@ -1397,7 +1360,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
|
||||||
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
|
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
|
getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
|
||||||
} while (win.ekey <= endTsCols[i]);
|
} while (win.ekey <= endTsCols[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1247,8 +1247,8 @@ static bool smaIndexOptEqualInterval(SScanLogicNode* pScan, SWindowLogicNode* pW
|
||||||
.sliding = pIndex->sliding,
|
.sliding = pIndex->sliding,
|
||||||
.slidingUnit = pIndex->slidingUnit,
|
.slidingUnit = pIndex->slidingUnit,
|
||||||
.precision = pScan->node.precision};
|
.precision = pScan->node.precision};
|
||||||
return (pScan->scanRange.skey == taosTimeTruncate(pScan->scanRange.skey, &interval, pScan->node.precision)) &&
|
return (pScan->scanRange.skey == taosTimeTruncate(pScan->scanRange.skey, &interval)) &&
|
||||||
(pScan->scanRange.ekey + 1 == taosTimeTruncate(pScan->scanRange.ekey + 1, &interval, pScan->node.precision));
|
(pScan->scanRange.ekey + 1 == taosTimeTruncate(pScan->scanRange.ekey + 1, &interval));
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue