From e753d631afa5f12f38881488300b8e58f0387995 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 Nov 2023 16:10:09 +0800 Subject: [PATCH] fix(stream): make sure open operator only once for scan-history task. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamExec.c | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5cb8c8aa32..a1b96cbd99 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -391,6 +391,7 @@ typedef struct SHistoryTaskInfo { int32_t retryTimes; int32_t waitInterval; int64_t haltVer; // offset in wal when halt the stream task + bool operatorOpen; // false by default } SHistoryTaskInfo; typedef struct STaskOutputInfo { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8ab8f3852e..25f32195be 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -256,7 +256,10 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { bool finished = false; const char* id = pTask->id.idStr; - qSetStreamOpOpen(exec); + if (!pTask->hTaskInfo.operatorOpen) { + qSetStreamOpOpen(exec); + pTask->hTaskInfo.operatorOpen = true; + } while (1) { if (streamTaskShouldPause(pTask)) {