feat:[TD-32642] add timezone for connection support
This commit is contained in:
parent
375c3c4873
commit
ea9c5d3ce6
|
@ -60,7 +60,7 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
|
|||
*/
|
||||
int64_t taosGetTimestampToday(int32_t precision, timezone_t tz);
|
||||
|
||||
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
|
||||
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision, timezone_t tz);
|
||||
|
||||
int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval);
|
||||
int64_t taosTimeGetIntervalEnd(int64_t ts, const SInterval* pInterval);
|
||||
|
|
|
@ -296,6 +296,12 @@ struct SScalarParam {
|
|||
void *charsetCxt;
|
||||
};
|
||||
|
||||
static inline void setTzCharset(SScalarParam* param, timezone_t tz, void* charsetCxt){
|
||||
if (param == NULL) return;
|
||||
param->tz = tz;
|
||||
param->charsetCxt = charsetCxt;
|
||||
}
|
||||
|
||||
#define cleanupResultRowEntry(p) p->initialized = false
|
||||
#define isRowEntryCompleted(p) (p->complete)
|
||||
#define isRowEntryInitialized(p) (p->initialized)
|
||||
|
|
|
@ -623,7 +623,7 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati
|
|||
|
||||
static bool taosIsLeapYear(int32_t year) { return (year % 4 == 0 && (year % 100 != 0 || year % 400 == 0)); }
|
||||
|
||||
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
|
||||
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision, timezone_t tz) {
|
||||
if (duration == 0) {
|
||||
return t;
|
||||
}
|
||||
|
@ -638,7 +638,7 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
|
|||
|
||||
struct tm tm;
|
||||
time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision));
|
||||
if(taosGmTimeR(&tt, &tm) == NULL) {
|
||||
if(taosLocalTime(&tt, &tm, NULL, 0, tz) == NULL) {
|
||||
uError("failed to convert time to gm time, code:%d", errno);
|
||||
return t;
|
||||
}
|
||||
|
@ -652,7 +652,8 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
|
|||
if (tm.tm_mday > daysOfMonth[tm.tm_mon]) {
|
||||
tm.tm_mday = daysOfMonth[tm.tm_mon];
|
||||
}
|
||||
tt = taosTimeGm(&tm);
|
||||
|
||||
tt = taosMktime(&tm, tz);
|
||||
if (tt == -1){
|
||||
uError("failed to convert gm time to time, code:%d", errno);
|
||||
return t;
|
||||
|
@ -778,12 +779,12 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) {
|
|||
|
||||
if (news <= ts) {
|
||||
int64_t prev = news;
|
||||
int64_t newe = taosTimeAdd(news, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||
int64_t newe = taosTimeAdd(news, pInterval->interval, pInterval->intervalUnit, precision, NULL) - 1;
|
||||
|
||||
if (newe < ts) { // move towards the greater endpoint
|
||||
while (newe < ts && news < ts) {
|
||||
news += pInterval->sliding;
|
||||
newe = taosTimeAdd(news, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||
newe = taosTimeAdd(news, pInterval->interval, pInterval->intervalUnit, precision, NULL) - 1;
|
||||
}
|
||||
|
||||
prev = news;
|
||||
|
@ -791,7 +792,7 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) {
|
|||
while (newe >= ts) {
|
||||
prev = news;
|
||||
news -= pInterval->sliding;
|
||||
newe = taosTimeAdd(news, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||
newe = taosTimeAdd(news, pInterval->interval, pInterval->intervalUnit, precision, NULL) - 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -824,7 +825,7 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) {
|
|||
|
||||
// not enough time range
|
||||
if (start < 0 || INT64_MAX - start > pInterval->interval - 1) {
|
||||
end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||
end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision, NULL) - 1;
|
||||
while (end < ts) { // move forward to the correct time window
|
||||
start += pInterval->sliding;
|
||||
|
||||
|
@ -843,15 +844,15 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) {
|
|||
|
||||
if (pInterval->offset > 0) {
|
||||
// try to move current window to the left-hande-side, due to the offset effect.
|
||||
int64_t newe = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||
int64_t newe = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision, NULL) - 1;
|
||||
int64_t slidingStart = start;
|
||||
while (newe >= ts) {
|
||||
start = slidingStart;
|
||||
slidingStart = taosTimeAdd(slidingStart, -pInterval->sliding, pInterval->slidingUnit, precision);
|
||||
int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||
newe = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, precision);
|
||||
slidingStart = taosTimeAdd(slidingStart, -pInterval->sliding, pInterval->slidingUnit, precision, NULL);
|
||||
int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, precision, NULL) - 1;
|
||||
newe = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, precision, NULL);
|
||||
}
|
||||
start = taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision);
|
||||
start = taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision, NULL);
|
||||
}
|
||||
|
||||
return start;
|
||||
|
@ -861,12 +862,12 @@ int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) {
|
|||
int64_t taosTimeGetIntervalEnd(int64_t intervalStart, const SInterval* pInterval) {
|
||||
if (pInterval->offset > 0) {
|
||||
int64_t slideStart =
|
||||
taosTimeAdd(intervalStart, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision);
|
||||
int64_t slideEnd = taosTimeAdd(slideStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||
int64_t result = taosTimeAdd(slideEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
|
||||
taosTimeAdd(intervalStart, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
|
||||
int64_t slideEnd = taosTimeAdd(slideStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
|
||||
int64_t result = taosTimeAdd(slideEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
|
||||
return result;
|
||||
} else {
|
||||
int64_t result = taosTimeAdd(intervalStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||
int64_t result = taosTimeAdd(intervalStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2406,7 +2406,7 @@ void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, b
|
|||
|
||||
int64_t key = w->skey;
|
||||
while (key < ts) { // moving towards end
|
||||
key = taosTimeAdd(key, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||
key = taosTimeAdd(key, pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
|
||||
if (key > ts) {
|
||||
break;
|
||||
}
|
||||
|
@ -2431,8 +2431,8 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
|
|||
STimeWindow save = win;
|
||||
while (win.skey <= ts && win.ekey >= ts) {
|
||||
save = win;
|
||||
win.skey = taosTimeAdd(win.skey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||
win.ekey = taosTimeAdd(win.ekey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||
win.skey = taosTimeAdd(win.skey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
|
||||
win.ekey = taosTimeAdd(win.ekey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
|
||||
}
|
||||
|
||||
return save;
|
||||
|
@ -2471,16 +2471,16 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
|
|||
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
|
||||
int64_t slidingStart = 0;
|
||||
if (pInterval->offset > 0) {
|
||||
slidingStart = taosTimeAdd(tw->skey, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision);
|
||||
slidingStart = taosTimeAdd(tw->skey, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
|
||||
} else {
|
||||
slidingStart = tw->skey;
|
||||
}
|
||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
|
||||
slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||
tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
|
||||
slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
|
||||
tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
|
||||
int64_t slidingEnd =
|
||||
taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||
tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision);
|
||||
taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
|
||||
tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
|
||||
}
|
||||
|
||||
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
|
||||
|
|
|
@ -560,7 +560,7 @@ static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) {
|
|||
next = ekey;
|
||||
while (next < pInfo->win.ekey) {
|
||||
next = taosTimeAdd(ekey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
|
||||
pInfo->pFillInfo->interval.precision);
|
||||
pInfo->pFillInfo->interval.precision, NULL);
|
||||
ekey = next > pInfo->win.ekey ? ekey : next;
|
||||
}
|
||||
pInfo->win.ekey = ekey;
|
||||
|
@ -569,7 +569,7 @@ static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) {
|
|||
next = skey;
|
||||
while (next < pInfo->win.skey) {
|
||||
next = taosTimeAdd(skey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
|
||||
pInfo->pFillInfo->interval.precision);
|
||||
pInfo->pFillInfo->interval.precision, NULL);
|
||||
skey = next > pInfo->win.skey ? skey : next;
|
||||
}
|
||||
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
|
||||
|
|
|
@ -270,12 +270,12 @@ static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillCo
|
|||
}
|
||||
|
||||
static void setFillInfoStart(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
|
||||
ts = taosTimeAdd(ts, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||
ts = taosTimeAdd(ts, pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
|
||||
pFillInfo->start = ts;
|
||||
}
|
||||
|
||||
static void setFillInfoEnd(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
|
||||
ts = taosTimeAdd(ts, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision);
|
||||
ts = taosTimeAdd(ts, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision, NULL);
|
||||
pFillInfo->end = ts;
|
||||
}
|
||||
|
||||
|
@ -292,7 +292,7 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS
|
|||
}
|
||||
|
||||
TSKEY realStart = taosTimeAdd(pFillSup->prev.key, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
||||
pFillSup->interval.precision);
|
||||
pFillSup->interval.precision, NULL);
|
||||
|
||||
pFillInfo->needFill = true;
|
||||
pFillInfo->start = realStart;
|
||||
|
@ -531,7 +531,7 @@ static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo*
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
||||
pFillSup->interval.precision);
|
||||
pFillSup->interval.precision, NULL);
|
||||
}
|
||||
|
||||
_end:
|
||||
|
@ -553,7 +553,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
|
|||
|
||||
if ((pFillSup->hasDelete && !ckRes) || !inWinRange(&pFillSup->winRange, &st)) {
|
||||
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
||||
pFillSup->interval.precision);
|
||||
pFillSup->interval.precision, NULL);
|
||||
pFillInfo->pLinearInfo->winIndex++;
|
||||
continue;
|
||||
}
|
||||
|
@ -599,7 +599,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
|
|||
}
|
||||
}
|
||||
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
||||
pFillSup->interval.precision);
|
||||
pFillSup->interval.precision, NULL);
|
||||
pBlock->info.rows++;
|
||||
}
|
||||
|
||||
|
@ -1277,7 +1277,7 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
resTs = taosTimeAdd(resTs, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
||||
pFillSup->interval.precision);
|
||||
pFillSup->interval.precision, NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -444,7 +444,7 @@ static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
// }
|
||||
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
||||
pFillSup->interval.precision);
|
||||
pFillSup->interval.precision, NULL);
|
||||
}
|
||||
|
||||
_end:
|
||||
|
@ -504,7 +504,7 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
|
|||
}
|
||||
}
|
||||
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
||||
pFillSup->interval.precision);
|
||||
pFillSup->interval.precision, NULL);
|
||||
if (ckRes) {
|
||||
pBlock->info.rows++;
|
||||
}
|
||||
|
@ -524,14 +524,14 @@ static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStream
|
|||
|
||||
static TSKEY adustPrevTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) {
|
||||
if (rowTs >= pointTs) {
|
||||
pointTs = taosTimeAdd(pointTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||
pointTs = taosTimeAdd(pointTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
|
||||
}
|
||||
return pointTs;
|
||||
}
|
||||
|
||||
static TSKEY adustEndTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) {
|
||||
if (rowTs <= pointTs) {
|
||||
pointTs = taosTimeAdd(pointTs, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision);
|
||||
pointTs = taosTimeAdd(pointTs, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision, NULL);
|
||||
}
|
||||
return pointTs;
|
||||
}
|
||||
|
@ -927,7 +927,7 @@ static int32_t getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSu
|
|||
}
|
||||
} else {
|
||||
pNextPoint->key.ts = taosTimeAdd(pCurPoint->key.ts, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
||||
pFillSup->interval.precision);
|
||||
pFillSup->interval.precision, NULL);
|
||||
code = pAggSup->stateStore.streamStateFillAddIfNotExist(pState, &pNextPoint->key, (void**)&pNextPoint->pResPos,
|
||||
&nextVLen, &tmpRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
|
|
@ -358,7 +358,7 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp
|
|||
void* chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
|
||||
STimeWindow win = {
|
||||
.skey = pWinKey->ts,
|
||||
.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1,
|
||||
.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1,
|
||||
};
|
||||
if (isCloseWindow(&win, pTwSup)) {
|
||||
if (chIds && pPullDataMap) {
|
||||
|
@ -391,7 +391,7 @@ _end:
|
|||
|
||||
STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
|
||||
STimeWindow w = {.skey = ts, .ekey = INT64_MAX};
|
||||
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
|
||||
return w;
|
||||
}
|
||||
|
||||
|
@ -851,7 +851,7 @@ static int32_t processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pF
|
|||
}
|
||||
}
|
||||
}
|
||||
winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||
winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
|
||||
}
|
||||
}
|
||||
if (pBeOver) {
|
||||
|
|
|
@ -143,7 +143,7 @@ bool fillIfWindowPseudoColumn(SFillInfo* pFillInfo, SFillColInfo* pCol, SColumnI
|
|||
// TODO: include endpoint
|
||||
SInterval* pInterval = &pFillInfo->interval;
|
||||
int64_t windowEnd =
|
||||
taosTimeAdd(pFillInfo->currentKey, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||
taosTimeAdd(pFillInfo->currentKey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
|
||||
code = colDataSetVal(pDstColInfoData, rowIndex, (const char*)&windowEnd, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
return true;
|
||||
|
@ -264,7 +264,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
|||
// setTagsValue(pFillInfo, data, index);
|
||||
SInterval* pInterval = &pFillInfo->interval;
|
||||
pFillInfo->currentKey =
|
||||
taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
|
||||
taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision, NULL);
|
||||
pBlock->info.rows += 1;
|
||||
pFillInfo->numOfCurrent++;
|
||||
|
||||
|
@ -484,7 +484,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
// set the tag value for final result
|
||||
SInterval* pInterval = &pFillInfo->interval;
|
||||
pFillInfo->currentKey =
|
||||
taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
|
||||
taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision, NULL);
|
||||
|
||||
pBlock->info.rows += 1;
|
||||
pFillInfo->index += 1;
|
||||
|
|
|
@ -833,7 +833,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
|||
doKeepLinearInfo(pSliceInfo, pBlock, i);
|
||||
|
||||
pSliceInfo->current =
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
|
||||
|
||||
if (checkWindowBoundReached(pSliceInfo)) {
|
||||
break;
|
||||
|
@ -859,7 +859,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
|||
break;
|
||||
} else {
|
||||
pSliceInfo->current =
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -887,7 +887,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
|||
break;
|
||||
} else {
|
||||
pSliceInfo->current =
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -897,7 +897,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pSliceInfo->current =
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
|
||||
}
|
||||
doKeepPrevRows(pSliceInfo, pBlock, i);
|
||||
|
||||
|
@ -935,7 +935,7 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato
|
|||
while (pSliceInfo->current <= pSliceInfo->win.ekey) {
|
||||
(void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo);
|
||||
pSliceInfo->current =
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1919,7 +1919,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
|
|||
|
||||
STimeWindow win = {0};
|
||||
win.skey = miaInfo->curTs;
|
||||
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
|
||||
|
||||
int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
|
||||
if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
|
||||
|
@ -1946,7 +1946,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
|
|||
miaInfo->curTs = tsCols[currPos];
|
||||
|
||||
currWin.skey = miaInfo->curTs;
|
||||
currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||
currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
|
||||
|
||||
startPos = currPos;
|
||||
ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
|
||||
|
|
|
@ -11820,7 +11820,7 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, SSDa
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (interval.interval > 0) {
|
||||
pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit,
|
||||
interval.precision);
|
||||
interval.precision, NULL);
|
||||
} else {
|
||||
pStmt->pReq->lastTs = lastTs + 1; // start key of the next time window
|
||||
}
|
||||
|
@ -12720,7 +12720,7 @@ int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSData
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (interval.interval > 0) {
|
||||
pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit,
|
||||
interval.precision);
|
||||
interval.precision, NULL);
|
||||
} else {
|
||||
pStmt->pReq->lastTs = lastTs + 1; // start key of the next time window
|
||||
}
|
||||
|
|
|
@ -6965,7 +6965,7 @@ static int32_t tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow*
|
|||
if (pScanRange->skey != TSKEY_MIN) {
|
||||
startOfSkeyFirstWin = taosTimeTruncate(pScanRange->skey, pInterval);
|
||||
endOfSkeyFirstWin =
|
||||
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision);
|
||||
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision, NULL);
|
||||
isSkeyAlignedWithTsma = taosTimeTruncate(pScanRange->skey, &tsmaInterval) == pScanRange->skey;
|
||||
} else {
|
||||
endOfSkeyFirstWin = TSKEY_MIN;
|
||||
|
@ -6975,7 +6975,7 @@ static int32_t tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow*
|
|||
if (pScanRange->ekey != TSKEY_MAX) {
|
||||
startOfEkeyFirstWin = taosTimeTruncate(pScanRange->ekey, pInterval);
|
||||
endOfEkeyFirstWin =
|
||||
taosTimeAdd(startOfEkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision);
|
||||
taosTimeAdd(startOfEkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision, NULL);
|
||||
isEkeyAlignedWithTsma = taosTimeTruncate(pScanRange->ekey + 1, &tsmaInterval) == (pScanRange->ekey + 1);
|
||||
if (startOfEkeyFirstWin > startOfSkeyFirstWin) {
|
||||
needTailWindow = true;
|
||||
|
@ -6986,7 +6986,7 @@ static int32_t tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow*
|
|||
if (!isSkeyAlignedWithTsma) {
|
||||
scanRange.ekey = TMIN(
|
||||
scanRange.ekey,
|
||||
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval * 1, pInterval->intervalUnit, pTsmaOptCtx->precision) - 1);
|
||||
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval * 1, pInterval->intervalUnit, pTsmaOptCtx->precision, NULL) - 1);
|
||||
const STSMAOptUsefulTsma* pTsmaFound =
|
||||
tsmaOptFindUsefulTsma(pTsmaOptCtx->pUsefulTsmas, 1, scanRange.skey, scanRange.ekey + 1, pTsmaOptCtx->precision);
|
||||
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsmaFound ? pTsmaFound->pTsma : NULL,
|
||||
|
|
|
@ -80,11 +80,8 @@ int32_t sclConvertValueToSclParam(SValueNode *pValueNode, SScalarParam *out, int
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
in.tz = pValueNode->tz;
|
||||
in.charsetCxt = pValueNode->charsetCxt;
|
||||
out->tz = pValueNode->tz;
|
||||
out->charsetCxt = pValueNode->charsetCxt;
|
||||
setTzCharset(&in, pValueNode->tz, pValueNode->charsetCxt);
|
||||
setTzCharset(out, pValueNode->tz, pValueNode->charsetCxt);
|
||||
code = vectorConvertSingleColImpl(&in, out, overflow, -1, -1);
|
||||
|
||||
_exit:
|
||||
|
@ -590,11 +587,11 @@ int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScal
|
|||
SCL_ERR_JRET(sclSetOperatorValueType(node, ctx));
|
||||
|
||||
SCL_ERR_JRET(sclInitParam(node->pLeft, ¶mList[0], ctx, rowNum));
|
||||
paramList[0].tz = node->tz;
|
||||
paramList[0].charsetCxt = node->charsetCxt;
|
||||
setTzCharset(¶mList[0], node->tz, node->charsetCxt);
|
||||
if (paramNum > 1) {
|
||||
TSWAP(ctx->type.selfType, ctx->type.peerType);
|
||||
SCL_ERR_JRET(sclInitParam(node->pRight, ¶mList[1], ctx, rowNum));
|
||||
setTzCharset(¶mList[1], node->tz, node->charsetCxt);
|
||||
}
|
||||
|
||||
*pParams = paramList;
|
||||
|
@ -762,8 +759,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
|
|||
int32_t paramNum = 0;
|
||||
int32_t code = 0;
|
||||
SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum));
|
||||
params->tz = node->tz;
|
||||
params->charsetCxt = node->charsetCxt;
|
||||
setTzCharset(params, node->tz, node->charsetCxt);
|
||||
|
||||
if (fmIsUserDefinedFunc(node->funcId)) {
|
||||
code = callUdfScalarFunc(node->functionName, params, paramNum, output);
|
||||
|
@ -966,10 +962,12 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp
|
|||
sclError("invalid when/then in whenThen list");
|
||||
SCL_ERR_JRET(TSDB_CODE_INVALID_PARA);
|
||||
}
|
||||
|
||||
setTzCharset(pCase, node->tz, node->charsetCxt);
|
||||
setTzCharset(pWhen, node->tz, node->charsetCxt);
|
||||
setTzCharset(pThen, node->tz, node->charsetCxt);
|
||||
setTzCharset(pElse, node->tz, node->charsetCxt);
|
||||
setTzCharset(output, node->tz, node->charsetCxt);
|
||||
if (pCase) {
|
||||
pCase->tz = node->tz;
|
||||
pCase->charsetCxt = node->charsetCxt;
|
||||
SCL_ERR_JRET(vectorCompare(pCase, pWhen, &comp, TSDB_ORDER_ASC, OP_TYPE_EQUAL));
|
||||
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
|
|
|
@ -1191,7 +1191,7 @@ static int32_t vectorMathAddHelper(SColumnInfoData *pLeftCol, SColumnInfoData *p
|
|||
}
|
||||
|
||||
static int32_t vectorMathTsAddHelper(SColumnInfoData *pLeftCol, SColumnInfoData *pRightCol, SColumnInfoData *pOutputCol,
|
||||
int32_t numOfRows, int32_t step, int32_t i) {
|
||||
int32_t numOfRows, int32_t step, int32_t i, timezone_t tz) {
|
||||
_getBigintValue_fn_t getVectorBigintValueFnLeft;
|
||||
_getBigintValue_fn_t getVectorBigintValueFnRight;
|
||||
SCL_ERR_RET(getVectorBigintValueFn(pLeftCol->info.type, &getVectorBigintValueFnLeft));
|
||||
|
@ -1211,7 +1211,7 @@ static int32_t vectorMathTsAddHelper(SColumnInfoData *pLeftCol, SColumnInfoData
|
|||
SCL_ERR_RET(getVectorBigintValueFnLeft(pLeftCol->pData, i, &leftRes));
|
||||
SCL_ERR_RET(getVectorBigintValueFnRight(pRightCol->pData, 0, &rightRes));
|
||||
*output =
|
||||
taosTimeAdd(leftRes, rightRes, pRightCol->info.scale, pRightCol->info.precision);
|
||||
taosTimeAdd(leftRes, rightRes, pRightCol->info.scale, pRightCol->info.precision, tz);
|
||||
}
|
||||
}
|
||||
SCL_RET(TSDB_CODE_SUCCESS);
|
||||
|
@ -1275,14 +1275,14 @@ int32_t vectorMathAdd(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *p
|
|||
|
||||
if (pLeft->numOfRows == 1 && pRight->numOfRows == 1) {
|
||||
if (GET_PARAM_TYPE(pLeft) == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
SCL_ERR_JRET(vectorMathTsAddHelper(pLeftCol, pRightCol, pOutputCol, pRight->numOfRows, step, i));
|
||||
SCL_ERR_JRET(vectorMathTsAddHelper(pLeftCol, pRightCol, pOutputCol, pRight->numOfRows, step, i, pLeft->tz));
|
||||
} else {
|
||||
SCL_ERR_JRET(vectorMathTsAddHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, i));
|
||||
SCL_ERR_JRET(vectorMathTsAddHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, i, pLeft->tz));
|
||||
}
|
||||
} else if (pLeft->numOfRows == 1) {
|
||||
SCL_ERR_JRET(vectorMathTsAddHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, i));
|
||||
SCL_ERR_JRET(vectorMathTsAddHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, i, pLeft->tz));
|
||||
} else if (pRight->numOfRows == 1) {
|
||||
SCL_ERR_JRET(vectorMathTsAddHelper(pLeftCol, pRightCol, pOutputCol, pLeft->numOfRows, step, i));
|
||||
SCL_ERR_JRET(vectorMathTsAddHelper(pLeftCol, pRightCol, pOutputCol, pLeft->numOfRows, step, i, pLeft->tz));
|
||||
} else if (pLeft->numOfRows == pRight->numOfRows) {
|
||||
for (; i < pRight->numOfRows && i >= 0; i += step, output += 1) {
|
||||
if (IS_NULL) {
|
||||
|
@ -1356,7 +1356,7 @@ static int32_t vectorMathSubHelper(SColumnInfoData *pLeftCol, SColumnInfoData *p
|
|||
}
|
||||
|
||||
static int32_t vectorMathTsSubHelper(SColumnInfoData *pLeftCol, SColumnInfoData *pRightCol, SColumnInfoData *pOutputCol,
|
||||
int32_t numOfRows, int32_t step, int32_t factor, int32_t i) {
|
||||
int32_t numOfRows, int32_t step, int32_t factor, int32_t i, timezone_t tz) {
|
||||
_getBigintValue_fn_t getVectorBigintValueFnLeft;
|
||||
_getBigintValue_fn_t getVectorBigintValueFnRight;
|
||||
SCL_ERR_RET(getVectorBigintValueFn(pLeftCol->info.type, &getVectorBigintValueFnLeft));
|
||||
|
@ -1377,7 +1377,7 @@ static int32_t vectorMathTsSubHelper(SColumnInfoData *pLeftCol, SColumnInfoData
|
|||
SCL_ERR_RET(getVectorBigintValueFnLeft(pLeftCol->pData, i, &leftRes));
|
||||
SCL_ERR_RET(getVectorBigintValueFnRight(pRightCol->pData, 0, &rightRes));
|
||||
*output =
|
||||
taosTimeAdd(leftRes, -rightRes, pRightCol->info.scale, pRightCol->info.precision);
|
||||
taosTimeAdd(leftRes, -rightRes, pRightCol->info.scale, pRightCol->info.precision, tz);
|
||||
}
|
||||
}
|
||||
SCL_RET(TSDB_CODE_SUCCESS);
|
||||
|
@ -1408,11 +1408,11 @@ int32_t vectorMathSub(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *p
|
|||
SCL_ERR_JRET(getVectorBigintValueFn(pRightCol->info.type, &getVectorBigintValueFnRight));
|
||||
|
||||
if (pLeft->numOfRows == 1 && pRight->numOfRows == 1) {
|
||||
SCL_ERR_JRET(vectorMathTsSubHelper(pLeftCol, pRightCol, pOutputCol, pLeft->numOfRows, step, 1, i));
|
||||
SCL_ERR_JRET(vectorMathTsSubHelper(pLeftCol, pRightCol, pOutputCol, pLeft->numOfRows, step, 1, i, pLeft->tz));
|
||||
} else if (pLeft->numOfRows == 1) {
|
||||
SCL_ERR_JRET(vectorMathTsSubHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, -1, i));
|
||||
SCL_ERR_JRET(vectorMathTsSubHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, -1, i, pLeft->tz));
|
||||
} else if (pRight->numOfRows == 1) {
|
||||
SCL_ERR_JRET(vectorMathTsSubHelper(pLeftCol, pRightCol, pOutputCol, pLeft->numOfRows, step, 1, i));
|
||||
SCL_ERR_JRET(vectorMathTsSubHelper(pLeftCol, pRightCol, pOutputCol, pLeft->numOfRows, step, 1, i, pLeft->tz));
|
||||
} else if (pLeft->numOfRows == pRight->numOfRows) {
|
||||
for (; i < pRight->numOfRows && i >= 0; i += step, output += 1) {
|
||||
if (IS_NULL) {
|
||||
|
@ -2042,12 +2042,8 @@ int32_t vectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPara
|
|||
SScalarParam *param1 = NULL;
|
||||
SScalarParam *param2 = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
pRight->tz = pLeft->tz;
|
||||
pRight->charsetCxt = pLeft->charsetCxt;
|
||||
pLeftOut.tz = pLeft->tz;
|
||||
pLeftOut.charsetCxt = pLeft->charsetCxt;
|
||||
pRightOut.tz = pRight->tz;
|
||||
pRightOut.charsetCxt = pRight->charsetCxt;
|
||||
setTzCharset(&pLeftOut, pLeft->tz, pLeft->charsetCxt);
|
||||
setTzCharset(&pRightOut, pLeft->tz, pLeft->charsetCxt);
|
||||
if (noConvertBeforeCompare(GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight), optr)) {
|
||||
param1 = pLeft;
|
||||
param2 = pRight;
|
||||
|
@ -2131,10 +2127,6 @@ int32_t vectorIsNull(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pO
|
|||
}
|
||||
|
||||
int32_t vectorNotNull(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord) {
|
||||
if (pRight != NULL) {
|
||||
pRight->tz = pLeft->tz;
|
||||
pRight->charsetCxt = pLeft->charsetCxt;
|
||||
}
|
||||
for (int32_t i = 0; i < pLeft->numOfRows; ++i) {
|
||||
int8_t v = IS_HELPER_NULL(pLeft->columnData, i) ? 0 : 1;
|
||||
if (v) {
|
||||
|
@ -2148,11 +2140,6 @@ int32_t vectorNotNull(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *p
|
|||
}
|
||||
|
||||
int32_t vectorIsTrue(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord) {
|
||||
if (pRight != NULL) {
|
||||
pRight->tz = pLeft->tz;
|
||||
pRight->charsetCxt = pLeft->charsetCxt;
|
||||
|
||||
}
|
||||
SCL_ERR_RET(vectorConvertSingleColImpl(pLeft, pOut, NULL, -1, -1));
|
||||
for (int32_t i = 0; i < pOut->numOfRows; ++i) {
|
||||
if (colDataIsNull_s(pOut->columnData, i)) {
|
||||
|
|
|
@ -610,7 +610,7 @@ struct tm *taosLocalTime(const time_t *timep, struct tm *result, char *buf, int3
|
|||
(void)snprintf(buf, bufSize, "NaN");
|
||||
}
|
||||
#endif
|
||||
return result;
|
||||
return res;
|
||||
}
|
||||
|
||||
int32_t taosGetTimestampSec() { return (int32_t)time(NULL); }
|
||||
|
|
Loading…
Reference in New Issue