td-1099: natual interval in stream and bug fix
This commit is contained in:
parent
1afed4c067
commit
e967332096
|
@ -367,6 +367,8 @@ typedef struct SSqlStream {
|
||||||
uint32_t streamId;
|
uint32_t streamId;
|
||||||
char listed;
|
char listed;
|
||||||
bool isProject;
|
bool isProject;
|
||||||
|
char intervalTimeUnit;
|
||||||
|
char slidingTimeUnit;
|
||||||
int16_t precision;
|
int16_t precision;
|
||||||
int64_t num; // number of computing count
|
int64_t num; // number of computing count
|
||||||
|
|
||||||
|
@ -380,7 +382,7 @@ typedef struct SSqlStream {
|
||||||
int64_t ctime; // stream created time
|
int64_t ctime; // stream created time
|
||||||
int64_t stime; // stream next executed time
|
int64_t stime; // stream next executed time
|
||||||
int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed
|
int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed
|
||||||
int64_t interval;
|
int64_t intervalTime;
|
||||||
int64_t slidingTime;
|
int64_t slidingTime;
|
||||||
void * pTimer;
|
void * pTimer;
|
||||||
|
|
||||||
|
|
|
@ -259,11 +259,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
||||||
pSdesc->num = htobe64(pStream->num);
|
pSdesc->num = htobe64(pStream->num);
|
||||||
|
|
||||||
pSdesc->useconds = htobe64(pStream->useconds);
|
pSdesc->useconds = htobe64(pStream->useconds);
|
||||||
pSdesc->stime = htobe64(pStream->stime - pStream->interval);
|
pSdesc->stime = htobe64(pStream->stime - pStream->intervalTime);
|
||||||
pSdesc->ctime = htobe64(pStream->ctime);
|
pSdesc->ctime = htobe64(pStream->ctime);
|
||||||
|
|
||||||
pSdesc->slidingTime = htobe64(pStream->slidingTime);
|
pSdesc->slidingTime = htobe64(pStream->slidingTime);
|
||||||
pSdesc->interval = htobe64(pStream->interval);
|
pSdesc->interval = htobe64(pStream->intervalTime);
|
||||||
|
|
||||||
pHeartbeat->numOfStreams++;
|
pHeartbeat->numOfStreams++;
|
||||||
pSdesc++;
|
pSdesc++;
|
||||||
|
|
|
@ -46,22 +46,23 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t tscGetRetryDelayTime(int64_t slidingTime, int16_t prec) {
|
static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, int16_t prec) {
|
||||||
float retryRangeFactor = 0.3f;
|
float retryRangeFactor = 0.3f;
|
||||||
|
|
||||||
// change to ms
|
|
||||||
if (prec == TSDB_TIME_PRECISION_MICRO) {
|
|
||||||
slidingTime = slidingTime / 1000;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t retryDelta = (int64_t)(tsStreamCompRetryDelay * retryRangeFactor);
|
int64_t retryDelta = (int64_t)(tsStreamCompRetryDelay * retryRangeFactor);
|
||||||
retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L;
|
retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L;
|
||||||
|
|
||||||
if (slidingTime < retryDelta) {
|
if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') {
|
||||||
return slidingTime;
|
// change to ms
|
||||||
} else {
|
if (prec == TSDB_TIME_PRECISION_MICRO) {
|
||||||
return retryDelta;
|
slidingTime = slidingTime / 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (slidingTime < retryDelta) {
|
||||||
|
return slidingTime;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return retryDelta;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
|
static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
|
||||||
|
@ -86,7 +87,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
|
||||||
|
|
||||||
// failed to get meter/metric meta, retry in 10sec.
|
// failed to get meter/metric meta, retry in 10sec.
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
|
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision);
|
||||||
tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
|
tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
|
||||||
tscSetRetryTimer(pStream, pSql, retryDelayTime);
|
tscSetRetryTimer(pStream, pSql, retryDelayTime);
|
||||||
|
|
||||||
|
@ -131,13 +132,17 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
|
||||||
}
|
}
|
||||||
if (etime > pStream->etime) {
|
if (etime > pStream->etime) {
|
||||||
etime = pStream->etime;
|
etime = pStream->etime;
|
||||||
|
} else if (pStream->intervalTimeUnit != 'y' && pStream->intervalTimeUnit != 'n') {
|
||||||
|
etime = pStream->stime + (etime - pStream->stime) / pStream->intervalTime * pStream->intervalTime;
|
||||||
} else {
|
} else {
|
||||||
etime = pStream->stime + (etime - pStream->stime) / pStream->interval * pStream->interval;
|
etime = taosGetIntervalStartTimestamp(etime, pStream->slidingTime, pStream->intervalTime, pStream->slidingTimeUnit, pStream->precision);
|
||||||
}
|
}
|
||||||
pQueryInfo->window.ekey = etime;
|
pQueryInfo->window.ekey = etime;
|
||||||
if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) {
|
if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) {
|
||||||
int64_t timer = pStream->slidingTime;
|
int64_t timer = pStream->slidingTime;
|
||||||
if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
|
if (pStream->intervalTimeUnit == 'y' || pStream->intervalTimeUnit == 'n') {
|
||||||
|
timer = 86400 * 1000l;
|
||||||
|
} else if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
timer /= 1000l;
|
timer /= 1000l;
|
||||||
}
|
}
|
||||||
tscSetRetryTimer(pStream, pSql, timer);
|
tscSetRetryTimer(pStream, pSql, timer);
|
||||||
|
@ -157,7 +162,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
|
||||||
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) {
|
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) {
|
||||||
SSqlStream *pStream = (SSqlStream *)param;
|
SSqlStream *pStream = (SSqlStream *)param;
|
||||||
if (tres == NULL || numOfRows < 0) {
|
if (tres == NULL || numOfRows < 0) {
|
||||||
int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
|
int64_t retryDelay = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision);
|
||||||
tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows,
|
tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows,
|
||||||
retryDelay);
|
retryDelay);
|
||||||
|
|
||||||
|
@ -218,7 +223,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
|
||||||
SSqlObj * pSql = (SSqlObj *)res;
|
SSqlObj * pSql = (SSqlObj *)res;
|
||||||
|
|
||||||
if (pSql == NULL || numOfRows < 0) {
|
if (pSql == NULL || numOfRows < 0) {
|
||||||
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
|
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->slidingTime, pStream->precision);
|
||||||
tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
|
tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
|
||||||
|
|
||||||
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
|
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
|
||||||
|
@ -241,7 +246,11 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pStream->isProject) {
|
if (!pStream->isProject) {
|
||||||
pStream->stime += pStream->slidingTime;
|
if (pStream->intervalTimeUnit == 'y' || pStream->intervalTimeUnit == 'n') {
|
||||||
|
pStream->stime = taosAddNatualInterval(pStream->stime, pStream->slidingTime, pStream->slidingTimeUnit, pStream->precision);
|
||||||
|
} else {
|
||||||
|
pStream->stime += pStream->slidingTime;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// actually only one row is returned. this following is not necessary
|
// actually only one row is returned. this following is not necessary
|
||||||
taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream);
|
taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream);
|
||||||
|
@ -301,7 +310,7 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
|
||||||
now + timer, timer, delay, pStream->stime, etime);
|
now + timer, timer, delay, pStream->stime, etime);
|
||||||
} else {
|
} else {
|
||||||
tscDebug("%p stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql, pStream,
|
tscDebug("%p stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql, pStream,
|
||||||
pStream->stime, timer, delay, pStream->stime - pStream->interval, pStream->stime - 1);
|
pStream->stime, timer, delay, pStream->stime - pStream->intervalTime, pStream->stime - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
pSql->cmd.command = TSDB_SQL_SELECT;
|
pSql->cmd.command = TSDB_SQL_SELECT;
|
||||||
|
@ -311,23 +320,26 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
|
static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
|
||||||
int64_t delayDelta = (int64_t)(pStream->slidingTime * tsStreamComputDelayRatio);
|
|
||||||
|
|
||||||
int64_t maxDelay =
|
int64_t maxDelay =
|
||||||
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay;
|
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMaxStreamComputDelay * 1000L : tsMaxStreamComputDelay;
|
||||||
|
|
||||||
if (delayDelta > maxDelay) {
|
int64_t delayDelta = maxDelay;
|
||||||
delayDelta = maxDelay;
|
if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') {
|
||||||
}
|
delayDelta = pStream->slidingTime * tsStreamComputDelayRatio;
|
||||||
|
if (delayDelta > maxDelay) {
|
||||||
int64_t remainTimeWindow = pStream->slidingTime - delayDelta;
|
delayDelta = maxDelay;
|
||||||
if (maxDelay > remainTimeWindow) {
|
}
|
||||||
maxDelay = (int64_t)(remainTimeWindow / 1.5f);
|
int64_t remainTimeWindow = pStream->slidingTime - delayDelta;
|
||||||
|
if (maxDelay > remainTimeWindow) {
|
||||||
|
maxDelay = (int64_t)(remainTimeWindow / 1.5f);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t currentDelay = (rand() % maxDelay); // a random number
|
int64_t currentDelay = (rand() % maxDelay); // a random number
|
||||||
currentDelay += delayDelta;
|
currentDelay += delayDelta;
|
||||||
assert(currentDelay < pStream->slidingTime);
|
if (pStream->intervalTimeUnit != 'n' && pStream->intervalTimeUnit != 'y') {
|
||||||
|
assert(currentDelay < pStream->slidingTime);
|
||||||
|
}
|
||||||
|
|
||||||
return currentDelay;
|
return currentDelay;
|
||||||
}
|
}
|
||||||
|
@ -354,7 +366,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if ((pStream->stime - pStream->interval) >= pStream->etime) {
|
int64_t stime = taosGetIntervalStartTimestamp(pStream->stime - 1, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision);
|
||||||
|
if (stime >= pStream->etime) {
|
||||||
tscDebug("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream,
|
tscDebug("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream,
|
||||||
pStream->stime, pStream->etime);
|
pStream->stime, pStream->etime);
|
||||||
// TODO : How to terminate stream here
|
// TODO : How to terminate stream here
|
||||||
|
@ -387,24 +400,24 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
|
||||||
|
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||||
|
|
||||||
if (pQueryInfo->intervalTime < minIntervalTime) {
|
if (pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y' && pQueryInfo->intervalTime < minIntervalTime) {
|
||||||
tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%" PRId64, pSql, pStream,
|
tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%" PRId64, pSql, pStream,
|
||||||
pQueryInfo->intervalTime, minIntervalTime);
|
pQueryInfo->intervalTime, minIntervalTime);
|
||||||
pQueryInfo->intervalTime = minIntervalTime;
|
pQueryInfo->intervalTime = minIntervalTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
pStream->interval = pQueryInfo->intervalTime; // it shall be derived from sql string
|
pStream->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
|
||||||
|
pStream->intervalTime = pQueryInfo->intervalTime; // it shall be derived from sql string
|
||||||
|
|
||||||
if (pQueryInfo->slidingTime == 0) {
|
if (pQueryInfo->slidingTime <= 0) {
|
||||||
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
|
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
|
||||||
|
pQueryInfo->slidingTimeUnit = pQueryInfo->intervalTimeUnit;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t minSlidingTime =
|
int64_t minSlidingTime =
|
||||||
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
|
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
|
||||||
|
|
||||||
if (pQueryInfo->slidingTime == -1) {
|
if (pQueryInfo->intervalTimeUnit != 'n' && pQueryInfo->intervalTimeUnit != 'y' && pQueryInfo->slidingTime < minSlidingTime) {
|
||||||
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
|
|
||||||
} else if (pQueryInfo->slidingTime < minSlidingTime) {
|
|
||||||
tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream,
|
tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream,
|
||||||
pQueryInfo->slidingTime, minSlidingTime);
|
pQueryInfo->slidingTime, minSlidingTime);
|
||||||
|
|
||||||
|
@ -418,6 +431,7 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
|
||||||
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
|
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pStream->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
|
||||||
pStream->slidingTime = pQueryInfo->slidingTime;
|
pStream->slidingTime = pQueryInfo->slidingTime;
|
||||||
|
|
||||||
if (pStream->isProject) {
|
if (pStream->isProject) {
|
||||||
|
@ -431,7 +445,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
|
||||||
|
|
||||||
if (pStream->isProject) {
|
if (pStream->isProject) {
|
||||||
// no data in table, flush all data till now to destination meter, 10sec delay
|
// no data in table, flush all data till now to destination meter, 10sec delay
|
||||||
pStream->interval = tsProjectExecInterval;
|
pStream->intervalTime = tsProjectExecInterval;
|
||||||
pStream->slidingTime = tsProjectExecInterval;
|
pStream->slidingTime = tsProjectExecInterval;
|
||||||
|
|
||||||
if (stime != 0) { // first projection start from the latest event timestamp
|
if (stime != 0) { // first projection start from the latest event timestamp
|
||||||
|
@ -442,11 +456,15 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
|
||||||
}
|
}
|
||||||
} else { // timewindow based aggregation stream
|
} else { // timewindow based aggregation stream
|
||||||
if (stime == 0) { // no data in meter till now
|
if (stime == 0) { // no data in meter till now
|
||||||
stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval;
|
stime = pQueryInfo->window.skey;
|
||||||
stime -= pStream->interval;
|
if (stime == INT64_MIN) {
|
||||||
tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime);
|
stime = (int64_t)taosGetTimestamp(pStream->precision);
|
||||||
|
stime = taosGetIntervalStartTimestamp(stime, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision);
|
||||||
|
stime = taosGetIntervalStartTimestamp(stime - 1, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision);
|
||||||
|
tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
int64_t newStime = (stime / pStream->interval) * pStream->interval;
|
int64_t newStime = taosGetIntervalStartTimestamp(stime, pStream->intervalTime, pStream->intervalTime, pStream->intervalTimeUnit, pStream->precision);
|
||||||
if (newStime != stime) {
|
if (newStime != stime) {
|
||||||
tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime);
|
tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime);
|
||||||
stime = newStime;
|
stime = newStime;
|
||||||
|
@ -516,7 +534,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
|
||||||
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
|
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
|
||||||
|
|
||||||
tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
|
tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
|
||||||
pStream, pTableMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, pSql->sqlstr);
|
pStream, pTableMetaInfo->name, pStream->intervalTime, pStream->slidingTime, starttime, pSql->sqlstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||||
|
|
|
@ -35,6 +35,8 @@ bool tscValidateTableNameLength(size_t len);
|
||||||
|
|
||||||
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
|
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
|
||||||
|
|
||||||
|
int64_t taosAddNatualInterval(int64_t key, int64_t intervalTime, char timeUnit, int16_t precision);
|
||||||
|
int32_t taosCountNatualInterval(int64_t skey, int64_t ekey, int64_t intervalTime, char timeUnit, int16_t precision);
|
||||||
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision);
|
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision);
|
||||||
|
|
||||||
#endif // TDENGINE_NAME_H
|
#endif // TDENGINE_NAME_H
|
||||||
|
|
|
@ -100,6 +100,62 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO
|
||||||
return pFilter;
|
return pFilter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t taosAddNatualInterval(int64_t key, int64_t intervalTime, char timeUnit, int16_t precision) {
|
||||||
|
key /= 1000;
|
||||||
|
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
|
key /= 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct tm tm;
|
||||||
|
time_t t = (time_t)key;
|
||||||
|
localtime_r(&t, &tm);
|
||||||
|
|
||||||
|
if (timeUnit == 'y') {
|
||||||
|
intervalTime *= 12;
|
||||||
|
}
|
||||||
|
|
||||||
|
int mon = tm.tm_year * 12 + tm.tm_mon + intervalTime;
|
||||||
|
tm.tm_year = mon / 12;
|
||||||
|
tm.tm_mon = mon % 12;
|
||||||
|
|
||||||
|
key = mktime(&tm) * 1000L;
|
||||||
|
|
||||||
|
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
|
key *= 1000L;
|
||||||
|
}
|
||||||
|
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosCountNatualInterval(int64_t skey, int64_t ekey, int64_t intervalTime, char timeUnit, int16_t precision) {
|
||||||
|
skey /= 1000;
|
||||||
|
ekey /= 1000;
|
||||||
|
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
|
skey /= 1000;
|
||||||
|
ekey /= 1000;
|
||||||
|
}
|
||||||
|
if (ekey < skey) {
|
||||||
|
int64_t tmp = ekey;
|
||||||
|
ekey = skey;
|
||||||
|
skey = tmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct tm tm;
|
||||||
|
time_t t = (time_t)skey;
|
||||||
|
localtime_r(&t, &tm);
|
||||||
|
int smon = tm.tm_year * 12 + tm.tm_mon;
|
||||||
|
|
||||||
|
t = (time_t)ekey;
|
||||||
|
localtime_r(&t, &tm);
|
||||||
|
int emon = tm.tm_year * 12 + tm.tm_mon;
|
||||||
|
|
||||||
|
if (timeUnit == 'y') {
|
||||||
|
intervalTime *= 12;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (emon - smon) / (int32_t)intervalTime;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
|
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
|
||||||
if (slidingTime == 0) {
|
if (slidingTime == 0) {
|
||||||
return startTime;
|
return startTime;
|
||||||
|
|
|
@ -137,33 +137,6 @@ static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
|
||||||
|
|
||||||
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
|
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
|
||||||
|
|
||||||
static int64_t addNatualInterval(int64_t key, int64_t intervalTime, char intervalTimeUnit, int precision) {
|
|
||||||
key /= 1000;
|
|
||||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
|
||||||
key /= 1000;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct tm tm;
|
|
||||||
time_t t = (time_t)key;
|
|
||||||
localtime_r(&t, &tm);
|
|
||||||
|
|
||||||
if (intervalTimeUnit == 'y') {
|
|
||||||
intervalTime *= 12;
|
|
||||||
}
|
|
||||||
|
|
||||||
int mon = tm.tm_year * 12 + tm.tm_mon + intervalTime;
|
|
||||||
tm.tm_year = mon / 12;
|
|
||||||
tm.tm_mon = mon % 12;
|
|
||||||
|
|
||||||
key = mktime(&tm) * 1000L;
|
|
||||||
|
|
||||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
|
||||||
key *= 1000L;
|
|
||||||
}
|
|
||||||
|
|
||||||
return key;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
|
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
|
||||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
if (pQuery->intervalTimeUnit != 'n' && pQuery->intervalTimeUnit != 'y') {
|
if (pQuery->intervalTimeUnit != 'n' && pQuery->intervalTimeUnit != 'y') {
|
||||||
|
@ -528,7 +501,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
|
||||||
if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value
|
if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value
|
||||||
w.skey = pWindowResInfo->prevSKey;
|
w.skey = pWindowResInfo->prevSKey;
|
||||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||||
w.ekey = addNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
w.ekey = taosAddNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||||
} else {
|
} else {
|
||||||
w.ekey = w.skey + pQuery->intervalTime - 1;
|
w.ekey = w.skey + pQuery->intervalTime - 1;
|
||||||
}
|
}
|
||||||
|
@ -541,7 +514,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
|
||||||
if (w.skey > ts || w.ekey < ts) {
|
if (w.skey > ts || w.ekey < ts) {
|
||||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||||
w.skey = taosGetIntervalStartTimestamp(ts, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
w.skey = taosGetIntervalStartTimestamp(ts, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
||||||
w.ekey = addNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
w.ekey = taosAddNatualInterval(w.skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||||
} else {
|
} else {
|
||||||
int64_t st = w.skey;
|
int64_t st = w.skey;
|
||||||
|
|
||||||
|
@ -883,7 +856,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
|
||||||
TSKEY next = primaryKeys[startPos];
|
TSKEY next = primaryKeys[startPos];
|
||||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||||
pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
||||||
pNext->ekey = addNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
pNext->ekey = taosAddNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||||
} else {
|
} else {
|
||||||
pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
|
pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
|
||||||
pNext->skey = pNext->ekey - pQuery->intervalTime + 1;
|
pNext->skey = pNext->ekey - pQuery->intervalTime + 1;
|
||||||
|
@ -892,7 +865,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
|
||||||
TSKEY next = primaryKeys[startPos];
|
TSKEY next = primaryKeys[startPos];
|
||||||
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||||
pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
pNext->skey = taosGetIntervalStartTimestamp(next, pQuery->slidingTime, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision);
|
||||||
pNext->ekey = addNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
pNext->ekey = taosAddNatualInterval(pNext->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||||
} else {
|
} else {
|
||||||
pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
|
pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
|
||||||
pNext->ekey = pNext->skey + pQuery->intervalTime - 1;
|
pNext->ekey = pNext->skey + pQuery->intervalTime - 1;
|
||||||
|
@ -1880,7 +1853,7 @@ void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int6
|
||||||
assert(keyLast - keyFirst < pQuery->intervalTime);
|
assert(keyLast - keyFirst < pQuery->intervalTime);
|
||||||
win->ekey = INT64_MAX;
|
win->ekey = INT64_MAX;
|
||||||
} else if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
} else if (pQuery->intervalTimeUnit == 'n' || pQuery->intervalTimeUnit == 'y') {
|
||||||
win->ekey = addNatualInterval(win->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
win->ekey = taosAddNatualInterval(win->skey, pQuery->intervalTime, pQuery->intervalTimeUnit, pQuery->precision) - 1;
|
||||||
} else {
|
} else {
|
||||||
win->ekey = win->skey + pQuery->intervalTime - 1;
|
win->ekey = win->skey + pQuery->intervalTime - 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -179,14 +179,22 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows
|
||||||
if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set.
|
if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set.
|
||||||
TSKEY lastKey = tsList[pFillInfo->numOfRows - 1];
|
TSKEY lastKey = tsList[pFillInfo->numOfRows - 1];
|
||||||
|
|
||||||
numOfRes = (int64_t)(ABS(lastKey - pFillInfo->start) / pFillInfo->slidingTime) + 1;
|
if (pFillInfo->slidingUnit != 'y' && pFillInfo->slidingUnit != 'n') {
|
||||||
|
numOfRes = (int64_t)(ABS(lastKey - pFillInfo->start) / pFillInfo->slidingTime) + 1;
|
||||||
|
} else {
|
||||||
|
numOfRes = taosCountNatualInterval(lastKey, pFillInfo->start, pFillInfo->slidingTime, pFillInfo->slidingUnit, pFillInfo->precision) + 1;
|
||||||
|
}
|
||||||
assert(numOfRes >= numOfRows);
|
assert(numOfRes >= numOfRows);
|
||||||
} else { // reach the end of data
|
} else { // reach the end of data
|
||||||
if ((ekey1 < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) ||
|
if ((ekey1 < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||||
(ekey1 > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) {
|
(ekey1 > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) {
|
||||||
return 0;
|
return 0;
|
||||||
} else { // the numOfRes rows are all filled with specified policy
|
}
|
||||||
|
// the numOfRes rows are all filled with specified policy
|
||||||
|
if (pFillInfo->slidingUnit != 'y' && pFillInfo->slidingUnit != 'n') {
|
||||||
numOfRes = (ABS(ekey1 - pFillInfo->start) / pFillInfo->slidingTime) + 1;
|
numOfRes = (ABS(ekey1 - pFillInfo->start) / pFillInfo->slidingTime) + 1;
|
||||||
|
} else {
|
||||||
|
numOfRes = taosCountNatualInterval(ekey1, pFillInfo->start, pFillInfo->slidingTime, pFillInfo->slidingUnit, pFillInfo->precision) + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -366,7 +374,12 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu
|
||||||
setTagsValue(pFillInfo, data, *num);
|
setTagsValue(pFillInfo, data, *num);
|
||||||
}
|
}
|
||||||
|
|
||||||
pFillInfo->start += (pFillInfo->slidingTime * step);
|
// TODO natual sliding time
|
||||||
|
if (pFillInfo->slidingUnit != 'n' && pFillInfo->slidingUnit != 'y') {
|
||||||
|
pFillInfo->start += (pFillInfo->slidingTime * step);
|
||||||
|
} else {
|
||||||
|
pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->slidingTime*step, pFillInfo->slidingUnit, pFillInfo->precision);
|
||||||
|
}
|
||||||
pFillInfo->numOfCurrent++;
|
pFillInfo->numOfCurrent++;
|
||||||
|
|
||||||
(*num) += 1;
|
(*num) += 1;
|
||||||
|
@ -473,7 +486,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
|
||||||
// set the tag value for final result
|
// set the tag value for final result
|
||||||
setTagsValue(pFillInfo, data, num);
|
setTagsValue(pFillInfo, data, num);
|
||||||
|
|
||||||
pFillInfo->start += (pFillInfo->slidingTime * step);
|
// TODO natual sliding time
|
||||||
|
if (pFillInfo->slidingUnit != 'n' && pFillInfo->slidingUnit != 'y') {
|
||||||
|
pFillInfo->start += (pFillInfo->slidingTime * step);
|
||||||
|
} else {
|
||||||
|
pFillInfo->start = taosAddNatualInterval(pFillInfo->start, pFillInfo->slidingTime*step, pFillInfo->slidingUnit, pFillInfo->precision);
|
||||||
|
}
|
||||||
pFillInfo->rowIdx += 1;
|
pFillInfo->rowIdx += 1;
|
||||||
|
|
||||||
pFillInfo->numOfCurrent +=1;
|
pFillInfo->numOfCurrent +=1;
|
||||||
|
|
Loading…
Reference in New Issue