This commit is contained in:
dapan1121 2021-04-23 11:40:57 +08:00
parent 61d373e206
commit fcb36f038b
1 changed files with 5 additions and 6 deletions

View File

@ -484,26 +484,25 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) { static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (stime == INT64_MIN) {
return stime;
}
if (pStream->isProject) { if (pStream->isProject) {
// no data in table, flush all data till now to destination meter, 10sec delay // no data in table, flush all data till now to destination meter, 10sec delay
pStream->interval.interval = tsProjectExecInterval; pStream->interval.interval = tsProjectExecInterval;
pStream->interval.sliding = tsProjectExecInterval; pStream->interval.sliding = tsProjectExecInterval;
if (stime != 0) { // first projection start from the latest event timestamp if (stime != INT64_MIN) { // first projection start from the latest event timestamp
assert(stime >= pQueryInfo->window.skey); assert(stime >= pQueryInfo->window.skey);
stime += 1; // exclude the last records from table stime += 1; // exclude the last records from table
} else { } else {
stime = pQueryInfo->window.skey; stime = pQueryInfo->window.skey;
} }
} else { // timewindow based aggregation stream } else { // timewindow based aggregation stream
if (stime == 0) { // no data in meter till now if (stime == INT64_MIN) { // no data in meter till now
if (pQueryInfo->window.skey != INT64_MIN) { if (pQueryInfo->window.skey != INT64_MIN) {
stime = pQueryInfo->window.skey; stime = pQueryInfo->window.skey;
} else {
return stime;
} }
stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision); stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
} else { } else {
int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision); int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);