Merge pull request #28964 from taosdata/fix/liaohj
fix(stream): the timestamp when the stream is created is set to be the initial force_window_close start time.
This commit is contained in:
commit
b695541ff6
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "streamInt.h"
|
||||
#include "ttime.h"
|
||||
|
||||
static int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) {
|
||||
*pSubmit = NULL;
|
||||
|
@ -307,13 +308,17 @@ void streamFreeQitem(SStreamQueueItem* data) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow, const char* id) {
|
||||
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t interval, SInterval* pInterval,
|
||||
STimeWindow* pLatestWindow, const char* id) {
|
||||
QRY_PARAM_CHECK(pTrigger);
|
||||
int64_t ts = INT64_MIN;
|
||||
|
||||
SStreamTrigger* p = NULL;
|
||||
int64_t ts = taosGetTimestamp(pInterval->precision);
|
||||
int64_t skey = pLatestWindow->skey + interval;
|
||||
|
||||
int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to create force_window trigger, code:%s", id, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -324,26 +329,10 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigge
|
|||
return terrno;
|
||||
}
|
||||
|
||||
// let's calculate the previous time window
|
||||
SInterval interval = {.interval = trigger,
|
||||
.sliding = trigger,
|
||||
.intervalUnit = pInterval->intervalUnit,
|
||||
.slidingUnit = pInterval->slidingUnit};
|
||||
|
||||
ts = taosGetTimestampMs();
|
||||
|
||||
if (pLatestWindow->skey == INT64_MIN) {
|
||||
STimeWindow window = getAlignQueryTimeWindow(&interval, ts - trigger);
|
||||
|
||||
p->pBlock->info.window.skey = window.skey;
|
||||
p->pBlock->info.window.ekey = TMAX(ts, window.ekey);
|
||||
} else {
|
||||
int64_t skey = pLatestWindow->skey + trigger;
|
||||
p->pBlock->info.window.skey = skey;
|
||||
p->pBlock->info.window.ekey = TMAX(ts, skey + trigger);
|
||||
}
|
||||
|
||||
p->pBlock->info.window.skey = skey;
|
||||
p->pBlock->info.window.ekey = TMAX(ts, skey + interval);
|
||||
p->pBlock->info.type = STREAM_GET_RESULT;
|
||||
|
||||
stDebug("s-task:%s force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, id,
|
||||
p->pBlock->info.window.skey, p->pBlock->info.window.ekey);
|
||||
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
#include "streamInt.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
#define TRIGGER_RECHECK_INTERVAL (5 * 1000)
|
||||
#define INITIAL_TRIGGER_INTERVAL (120 * 1000)
|
||||
|
||||
static void streamTaskResumeHelper(void* param, void* tmrId);
|
||||
static void streamTaskSchedHelper(void* param, void* tmrId);
|
||||
|
||||
|
@ -24,7 +27,7 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
|||
int64_t delay = 0;
|
||||
int32_t code = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
int64_t* pTaskRefId = NULL;
|
||||
int64_t* pTaskRefId = NULL;
|
||||
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
return;
|
||||
|
@ -32,8 +35,8 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
|||
|
||||
// dynamic set the trigger & triggerParam for STREAM_TRIGGER_FORCE_WINDOW_CLOSE
|
||||
if ((pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) && (pTask->info.taskLevel == TASK_LEVEL__SOURCE)) {
|
||||
int64_t waterMark = 0;
|
||||
SInterval interval = {0};
|
||||
int64_t waterMark = 0;
|
||||
SInterval interval = {0};
|
||||
STimeWindow lastTimeWindow = {0};
|
||||
code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval, &lastTimeWindow);
|
||||
if (code) {
|
||||
|
@ -41,19 +44,37 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
|||
return;
|
||||
}
|
||||
|
||||
pTask->status.latestForceWindow = lastTimeWindow;
|
||||
pTask->info.delaySchedParam = interval.sliding;
|
||||
pTask->info.watermark = waterMark;
|
||||
pTask->info.interval = interval;
|
||||
|
||||
// calculate the first start timestamp
|
||||
int64_t now = taosGetTimestamp(interval.precision);
|
||||
int64_t now = taosGetTimestamp(interval.precision);
|
||||
STimeWindow curWin = getAlignQueryTimeWindow(&pTask->info.interval, now);
|
||||
delay = (curWin.ekey + 1) - now + waterMark;
|
||||
|
||||
if (lastTimeWindow.skey == INT64_MIN) { // start from now, not the exec task timestamp after delay
|
||||
pTask->status.latestForceWindow.skey = curWin.skey - pTask->info.interval.interval;
|
||||
pTask->status.latestForceWindow.ekey = now;
|
||||
|
||||
delay = (curWin.ekey + 1) - now + waterMark;
|
||||
delay = convertTimePrecision(delay, interval.precision, TSDB_TIME_PRECISION_MILLI);
|
||||
|
||||
} else {
|
||||
pTask->status.latestForceWindow = lastTimeWindow;
|
||||
// It's the current calculated time window
|
||||
int64_t calEkey = lastTimeWindow.skey + pTask->info.interval.interval * 2;
|
||||
if (calEkey + waterMark < now) { // unfinished time window existed
|
||||
delay = INITIAL_TRIGGER_INTERVAL; // wait for 2min to start to calculate
|
||||
} else {
|
||||
delay = (curWin.ekey + 1) - now + waterMark;
|
||||
delay = convertTimePrecision(delay, interval.precision, TSDB_TIME_PRECISION_MILLI);
|
||||
}
|
||||
}
|
||||
|
||||
stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64
|
||||
" unit:%c, initial start after:%" PRId64,
|
||||
id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay);
|
||||
" unit:%c, initial start after:%" PRId64 "ms last_win:%" PRId64 "-%" PRId64,
|
||||
id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay,
|
||||
pTask->status.latestForceWindow.skey, pTask->status.latestForceWindow.ekey);
|
||||
} else {
|
||||
delay = pTask->info.delaySchedParam;
|
||||
if (delay == 0) {
|
||||
|
@ -66,8 +87,8 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s refId:%" PRId64 " enable the scheduler trigger, delay:%" PRId64, pTask->id.idStr,
|
||||
pTask->id.refId, delay);
|
||||
|
||||
streamTmrStart(streamTaskSchedHelper, (int32_t)delay, pTaskRefId, streamTimer,
|
||||
&pTask->schedInfo.pDelayTimer, pTask->pMeta->vgId, "sched-tmr");
|
||||
streamTmrStart(streamTaskSchedHelper, (int32_t)delay, pTaskRefId, streamTimer, &pTask->schedInfo.pDelayTimer,
|
||||
pTask->pMeta->vgId, "sched-tmr");
|
||||
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
|
||||
}
|
||||
}
|
||||
|
@ -165,7 +186,62 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
|
|||
streamTaskFreeRefId(param);
|
||||
}
|
||||
|
||||
static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrigger) {
|
||||
int32_t num = 0;
|
||||
int32_t code = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
int8_t precision = pTask->info.interval.precision;
|
||||
SStreamTrigger* pTrigger = NULL;
|
||||
|
||||
while (1) {
|
||||
code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval,
|
||||
&pTask->status.latestForceWindow, id);
|
||||
if (code != 0) {
|
||||
*pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
|
||||
stError("s-task:%s failed to prepare force window close trigger, code:%s, try again in %dms", id,
|
||||
tstrerror(code), *pNextTrigger);
|
||||
return code;
|
||||
}
|
||||
|
||||
// in the force window close model, status trigger does not matter. So we do not set the trigger model
|
||||
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
num += 1;
|
||||
|
||||
// check whether the time window gaps exist or not
|
||||
int64_t now = taosGetTimestamp(precision);
|
||||
int64_t ekey = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval;
|
||||
|
||||
// there are gaps, needs to be filled
|
||||
STimeWindow w = pTrigger->pBlock->info.window;
|
||||
w.ekey = w.skey + pTask->info.interval.interval;
|
||||
if (w.skey <= pTask->status.latestForceWindow.skey) {
|
||||
stFatal("s-task:%s invalid new time window in force_window_close trigger model, skey:%" PRId64
|
||||
" should be greater than latestForceWindow skey:%" PRId64,
|
||||
pTask->id.idStr, w.skey, pTask->status.latestForceWindow.skey);
|
||||
}
|
||||
|
||||
pTask->status.latestForceWindow = w;
|
||||
if (ekey + pTask->info.watermark + pTask->info.interval.interval > now) {
|
||||
int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
|
||||
|
||||
*pNextTrigger = ekey + pTask->info.watermark + pTask->info.interval.interval - now;
|
||||
*pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
|
||||
stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d", id, num, prev,
|
||||
*pNextTrigger);
|
||||
return code;
|
||||
} else {
|
||||
stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void streamTaskSchedHelper(void* param, void* tmrId) {
|
||||
int32_t code = 0;
|
||||
int64_t taskRefId = *(int64_t*)param;
|
||||
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
|
||||
if (pTask == NULL) {
|
||||
|
@ -174,15 +250,20 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
return;
|
||||
}
|
||||
|
||||
stDebug("s-task:%s acquire task, refId:%"PRId64, pTask->id.idStr, pTask->id.refId);
|
||||
stDebug("s-task:%s acquire task, refId:%" PRId64, pTask->id.idStr, pTask->id.refId);
|
||||
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t nextTrigger = (int32_t)pTask->info.delaySchedParam;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t nextTrigger = (int32_t)pTask->info.delaySchedParam;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
|
||||
int8_t status = atomic_load_8(&pTask->schedInfo.status);
|
||||
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
|
||||
|
||||
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
int32_t next = convertTimePrecision(nextTrigger, pTask->info.interval.precision, TSDB_TIME_PRECISION_MILLI);
|
||||
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, next);
|
||||
} else {
|
||||
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
|
||||
}
|
||||
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
stDebug("s-task:%s should stop, jump out of schedTimer", id);
|
||||
|
@ -192,62 +273,30 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
if (streamTaskShouldPause(pTask)) {
|
||||
stDebug("s-task:%s is paused, recheck in %.2fs", id, nextTrigger/1000.0);
|
||||
streamTmrStart(streamTaskSchedHelper, nextTrigger, param, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
|
||||
"sched-run-tmr");
|
||||
stDebug("s-task:%s is paused, recheck in %.2fs", id, TRIGGER_RECHECK_INTERVAL / 1000.0);
|
||||
streamTmrStart(streamTaskSchedHelper, TRIGGER_RECHECK_INTERVAL, param, streamTimer, &pTask->schedInfo.pDelayTimer,
|
||||
vgId, "sched-run-tmr");
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
if (streamTaskShouldPause(pTask)) {
|
||||
stDebug("s-task:%s is paused, check in nextTrigger:%ds", id, nextTrigger/1000);
|
||||
streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
|
||||
"sched-run-tmr");
|
||||
if (pTask->status.downstreamReady == 0) {
|
||||
stDebug("s-task:%s downstream not ready, recheck in %.2fs", id, TRIGGER_RECHECK_INTERVAL / 1000.0);
|
||||
streamTmrStart(streamTaskSchedHelper, TRIGGER_RECHECK_INTERVAL, param, streamTimer, &pTask->schedInfo.pDelayTimer,
|
||||
vgId, "sched-run-tmr");
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
|
||||
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
|
||||
nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 seec
|
||||
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, TRIGGER_RECHECK_INTERVAL);
|
||||
} else {
|
||||
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
SStreamTrigger* pTrigger = NULL;
|
||||
|
||||
while (1) {
|
||||
code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval,
|
||||
&pTask->status.latestForceWindow, id);
|
||||
if (code != 0) {
|
||||
stError("s-task:%s failed to prepare force window close trigger, code:%s, try again in %dms", id,
|
||||
tstrerror(code), nextTrigger);
|
||||
goto _end;
|
||||
}
|
||||
|
||||
// in the force window close model, status trigger does not matter. So we do not set the trigger model
|
||||
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
goto _end;
|
||||
}
|
||||
|
||||
// check whether the time window gaps exist or not
|
||||
int64_t now = taosGetTimestamp(pTask->info.interval.precision);
|
||||
int64_t intervalEndTs = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval;
|
||||
|
||||
// there are gaps, needs to be filled
|
||||
STimeWindow w = pTrigger->pBlock->info.window;
|
||||
w.ekey = w.skey + pTask->info.interval.interval;
|
||||
if (w.skey <= pTask->status.latestForceWindow.skey) {
|
||||
stFatal("s-task:%s invalid new time window in force_window_close model, skey:%" PRId64
|
||||
" should be greater than latestForceWindow skey:%" PRId64,
|
||||
pTask->id.idStr, w.skey, pTask->status.latestForceWindow.skey);
|
||||
}
|
||||
|
||||
pTask->status.latestForceWindow = w;
|
||||
if (intervalEndTs + pTask->info.watermark + pTask->info.interval.interval > now) {
|
||||
break;
|
||||
} else {
|
||||
stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey);
|
||||
}
|
||||
code = doCreateForceWindowTrigger(pTask, &nextTrigger);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
} else if (status == TASK_TRIGGER_STATUS__MAY_ACTIVE) {
|
||||
SStreamTrigger* pTrigger = NULL;
|
||||
code = streamCreateSinkResTrigger(&pTrigger);
|
||||
|
|
|
@ -4,7 +4,7 @@ system sh/exec.sh -n dnode1 -s start
|
|||
sleep 50
|
||||
sql connect
|
||||
|
||||
print =============== create database
|
||||
print ========================================== create database
|
||||
sql create database test vgroups 2;
|
||||
sql select * from information_schema.ins_databases
|
||||
if $rows != 3 then
|
||||
|
@ -18,7 +18,47 @@ sql create stable st(ts timestamp, a int) tags(t int);
|
|||
sql create table tu1 using st tags(1);
|
||||
|
||||
sql create stream stream1 trigger force_window_close into str_dst as select _wstart, count(*) from st partition by tbname interval(5s);
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into tu1 values(now, 1);
|
||||
sleep 5500
|
||||
|
||||
sql pause stream stream1
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop1:
|
||||
sleep 500
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
goto end_loop1
|
||||
endi
|
||||
|
||||
sql insert into tu1 values(now, 1);
|
||||
goto loop1
|
||||
|
||||
end_loop1:
|
||||
sql resume stream stream1
|
||||
sleep 5000
|
||||
|
||||
sql select sum(`count(*)`) from (select * from str_dst)
|
||||
|
||||
if $data00 != 20 then
|
||||
print expect 20, actual: $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql drop database test
|
||||
|
||||
print ===================================== micro precision db test
|
||||
print ============ create db
|
||||
sql create database test vgroups 2 precision 'us';
|
||||
|
||||
sql use test
|
||||
sql create stable st(ts timestamp, a int) tags(t int);
|
||||
sql create table tu1 using st tags(1);
|
||||
|
||||
sql create stream stream1 trigger force_window_close into str_dst as select _wstart, count(*) from st partition by tbname interval(5s);
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into tu1 values(now, 1);
|
||||
|
@ -41,10 +81,58 @@ goto loop0
|
|||
end_loop:
|
||||
|
||||
sql resume stream stream1
|
||||
sql select * from str_dst
|
||||
sleep 5000
|
||||
|
||||
if $rows != 3 then
|
||||
print expect 3, actual: $rows
|
||||
sql select sum(`count(*)`) from (select * from str_dst)
|
||||
|
||||
if $data00 != 20 then
|
||||
print expect 20, actual: $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql drop stream stream1
|
||||
sql drop table str_dst
|
||||
|
||||
print ============================= too long watermark test
|
||||
sql drop table tu1;
|
||||
sql create table tu1 using st tags(1);
|
||||
sql create stream stream2 trigger force_window_close watermark 30s into str_dst as select _wstart, count(*), now() from st partition by tbname interval(5s);
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop2:
|
||||
sleep 500
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
goto end_loop3
|
||||
endi
|
||||
|
||||
sql insert into tu1 values(now, 1);
|
||||
goto loop2
|
||||
|
||||
end_loop3:
|
||||
|
||||
sql select count(*) from str_dst
|
||||
print =================rows: $data00
|
||||
|
||||
if $data00 != 0 then
|
||||
print expect 0, actual $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 35000
|
||||
|
||||
sql select sum(`count(*)`) from (select * from str_dst)
|
||||
if $data00 != 19 then
|
||||
print expect 19, actual: $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select round(timediff(`now()`, `_wstart`)/1000000) from str_dst;
|
||||
if $data00 != 35.000000000 then
|
||||
print expect 35.000000000 , actual $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue