fix(stream): make sure open operator only once for scan-history task.
This commit is contained in:
parent
999afd97e5
commit
30d78c6be1
|
@ -402,7 +402,8 @@ typedef struct SHistoryTaskInfo {
|
||||||
int32_t tickCount;
|
int32_t tickCount;
|
||||||
int32_t retryTimes;
|
int32_t retryTimes;
|
||||||
int32_t waitInterval;
|
int32_t waitInterval;
|
||||||
int64_t haltVer; // offset in wal when halt the stream task
|
int64_t haltVer; // offset in wal when halt the stream task
|
||||||
|
bool operatorOpen; // false by default
|
||||||
} SHistoryTaskInfo;
|
} SHistoryTaskInfo;
|
||||||
|
|
||||||
typedef struct STaskOutputInfo {
|
typedef struct STaskOutputInfo {
|
||||||
|
|
|
@ -256,7 +256,10 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
||||||
bool finished = false;
|
bool finished = false;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
qSetStreamOpOpen(exec);
|
if (!pTask->hTaskInfo.operatorOpen) {
|
||||||
|
qSetStreamOpOpen(exec);
|
||||||
|
pTask->hTaskInfo.operatorOpen = true;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldPause(pTask)) {
|
if (streamTaskShouldPause(pTask)) {
|
||||||
|
|
Loading…
Reference in New Issue