enh(stream): generate the force_window_close trigger block.

This commit is contained in:
Haojun Liao 2024-09-20 19:18:35 +08:00
parent 71df0e0781
commit f3b42fd9d1
3 changed files with 27 additions and 17 deletions

View File

@ -84,7 +84,6 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
if (code) { if (code) {
return code; return code;
} }
} }

View File

@ -332,6 +332,7 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerTyp
STimeWindow window = getAlignQueryTimeWindow(&interval, now - trigger); STimeWindow window = getAlignQueryTimeWindow(&interval, now - trigger);
p->pBlock->info.window = window; p->pBlock->info.window = window;
p->pBlock->info.type = STREAM_GET_RESULT; p->pBlock->info.type = STREAM_GET_RESULT;
stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, window.skey, window.ekey);
} else { } else {
p->pBlock->info.type = STREAM_GET_ALL; p->pBlock->info.type = STREAM_GET_ALL;
} }

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "ttime.h"
#include "streamInt.h" #include "streamInt.h"
#include "ttimer.h" #include "ttimer.h"
@ -20,9 +21,10 @@ static void streamTaskResumeHelper(void* param, void* tmrId);
static void streamTaskSchedHelper(void* param, void* tmrId); static void streamTaskSchedHelper(void* param, void* tmrId);
void streamSetupScheduleTrigger(SStreamTask* pTask) { void streamSetupScheduleTrigger(SStreamTask* pTask) {
int64_t delay = 0; int64_t delay = 0;
int32_t code = 0; int32_t code = 0;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
return; return;
} }
@ -32,21 +34,28 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
int64_t waterMark = 0; int64_t waterMark = 0;
SInterval interval = {0}; SInterval interval = {0};
code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval); code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval);
if (code == 0) { if (code) {
pTask->info.delaySchedParam = interval.sliding; stError("s-task:%s failed to init scheduler info, code:%s", id, tstrerror(code));
pTask->info.watermark = waterMark; return;
pTask->info.interval = interval;
} }
// todo: calculate the correct start delay time for force_window_close pTask->info.delaySchedParam = interval.sliding;
delay = pTask->info.delaySchedParam; pTask->info.watermark = waterMark;
stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64 pTask->info.interval = interval;
" unit:%c ",
id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit);
}
if (delay == 0) { // calculate the first start timestamp
return; int64_t now = taosGetTimestamp(interval.precision);
STimeWindow curWin = getAlignQueryTimeWindow(&pTask->info.interval, now);
delay = (curWin.ekey + 1) - now + waterMark;
stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64
" unit:%c, initial start after:%" PRId64,
id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay);
} else {
delay = pTask->info.delaySchedParam;
if (delay == 0) {
return;
}
} }
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
@ -153,7 +162,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
} else { } else {
if (status == TASK_TRIGGER_STATUS__ACTIVE) { if ((status == TASK_TRIGGER_STATUS__ACTIVE) ||
(pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE)) {
SStreamTrigger* pTrigger; SStreamTrigger* pTrigger;
int32_t code = streamCreateSinkResTrigger(&pTrigger, pTask->info.trigger, pTask->info.delaySchedParam); int32_t code = streamCreateSinkResTrigger(&pTrigger, pTask->info.trigger, pTask->info.delaySchedParam);
@ -168,7 +178,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to put retrieve block into trigger, code:%s", pTask->id.idStr, tstrerror(code)); stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code));
goto _end; goto _end;
} }