fix(stream): the timestamp when the stream is created is set to be the initial force_window_close start time.
This commit is contained in:
parent
5e49268cd8
commit
58886a0264
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "streamInt.h"
|
||||
#include "ttime.h"
|
||||
|
||||
static int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) {
|
||||
*pSubmit = NULL;
|
||||
|
@ -330,7 +331,7 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigge
|
|||
.intervalUnit = pInterval->intervalUnit,
|
||||
.slidingUnit = pInterval->slidingUnit};
|
||||
|
||||
ts = taosGetTimestampMs();
|
||||
ts = taosGetTimestamp(pInterval->precision);
|
||||
|
||||
if (pLatestWindow->skey == INT64_MIN) {
|
||||
STimeWindow window = getAlignQueryTimeWindow(&interval, ts - trigger);
|
||||
|
|
|
@ -24,7 +24,7 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
|||
int64_t delay = 0;
|
||||
int32_t code = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
int64_t* pTaskRefId = NULL;
|
||||
int64_t* pTaskRefId = NULL;
|
||||
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
return;
|
||||
|
@ -41,7 +41,6 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
|||
return;
|
||||
}
|
||||
|
||||
pTask->status.latestForceWindow = lastTimeWindow;
|
||||
pTask->info.delaySchedParam = interval.sliding;
|
||||
pTask->info.watermark = waterMark;
|
||||
pTask->info.interval = interval;
|
||||
|
@ -51,9 +50,17 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
|||
STimeWindow curWin = getAlignQueryTimeWindow(&pTask->info.interval, now);
|
||||
delay = (curWin.ekey + 1) - now + waterMark;
|
||||
|
||||
if (lastTimeWindow.skey == INT64_MIN) { // start from now, not the exec task timestamp after delay
|
||||
pTask->status.latestForceWindow.skey = curWin.skey - pTask->info.interval.interval;
|
||||
pTask->status.latestForceWindow.ekey = now;
|
||||
} else {
|
||||
pTask->status.latestForceWindow = lastTimeWindow;
|
||||
}
|
||||
|
||||
stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64
|
||||
" unit:%c, initial start after:%" PRId64,
|
||||
id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay);
|
||||
" unit:%c, initial start after:%" PRId64" last_win:%"PRId64"-%"PRId64,
|
||||
id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay, pTask->status.latestForceWindow.skey,
|
||||
pTask->status.latestForceWindow.ekey);
|
||||
} else {
|
||||
delay = pTask->info.delaySchedParam;
|
||||
if (delay == 0) {
|
||||
|
@ -199,16 +206,11 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (streamTaskShouldPause(pTask)) {
|
||||
stDebug("s-task:%s is paused, check in nextTrigger:%ds", id, nextTrigger/1000);
|
||||
streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
|
||||
"sched-run-tmr");
|
||||
}
|
||||
|
||||
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
|
||||
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
|
||||
} else {
|
||||
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
int32_t num = 0;
|
||||
SStreamTrigger* pTrigger = NULL;
|
||||
|
||||
while (1) {
|
||||
|
@ -227,6 +229,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
goto _end;
|
||||
}
|
||||
|
||||
num += 1;
|
||||
|
||||
// check whether the time window gaps exist or not
|
||||
int64_t now = taosGetTimestamp(pTask->info.interval.precision);
|
||||
int64_t intervalEndTs = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval;
|
||||
|
@ -235,13 +239,14 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
STimeWindow w = pTrigger->pBlock->info.window;
|
||||
w.ekey = w.skey + pTask->info.interval.interval;
|
||||
if (w.skey <= pTask->status.latestForceWindow.skey) {
|
||||
stFatal("s-task:%s invalid new time window in force_window_close model, skey:%" PRId64
|
||||
stFatal("s-task:%s invalid new time window in force_window_close trigger model, skey:%" PRId64
|
||||
" should be greater than latestForceWindow skey:%" PRId64,
|
||||
pTask->id.idStr, w.skey, pTask->status.latestForceWindow.skey);
|
||||
}
|
||||
|
||||
pTask->status.latestForceWindow = w;
|
||||
if (intervalEndTs + pTask->info.watermark + pTask->info.interval.interval > now) {
|
||||
stDebug("s-task:%s generate %d time window(s)", id, num);
|
||||
break;
|
||||
} else {
|
||||
stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey);
|
||||
|
|
Loading…
Reference in New Issue