diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7f65ef8358..9f88672231 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -402,7 +402,8 @@ typedef struct SHistoryTaskInfo { int32_t tickCount; int32_t retryTimes; int32_t waitInterval; - int64_t haltVer; // offset in wal when halt the stream task + int64_t haltVer; // offset in wal when halt the stream task + bool operatorOpen; // false by default } SHistoryTaskInfo; typedef struct STaskOutputInfo { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8ab8f3852e..25f32195be 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -256,7 +256,10 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { bool finished = false; const char* id = pTask->id.idStr; - qSetStreamOpOpen(exec); + if (!pTask->hTaskInfo.operatorOpen) { + qSetStreamOpOpen(exec); + pTask->hTaskInfo.operatorOpen = true; + } while (1) { if (streamTaskShouldPause(pTask)) {