From 30d78c6be15bd270b90c739a385c16fcce849e5e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 Nov 2023 16:10:09 +0800 Subject: [PATCH] fix(stream): make sure open operator only once for scan-history task. --- include/libs/stream/tstream.h | 3 ++- source/libs/stream/src/streamExec.c | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7f65ef8358..9f88672231 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -402,7 +402,8 @@ typedef struct SHistoryTaskInfo { int32_t tickCount; int32_t retryTimes; int32_t waitInterval; - int64_t haltVer; // offset in wal when halt the stream task + int64_t haltVer; // offset in wal when halt the stream task + bool operatorOpen; // false by default } SHistoryTaskInfo; typedef struct STaskOutputInfo { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8ab8f3852e..25f32195be 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -256,7 +256,10 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { bool finished = false; const char* id = pTask->id.idStr; - qSetStreamOpOpen(exec); + if (!pTask->hTaskInfo.operatorOpen) { + qSetStreamOpOpen(exec); + pTask->hTaskInfo.operatorOpen = true; + } while (1) { if (streamTaskShouldPause(pTask)) {