diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index add893c8c7..32d6dc65d9 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -62,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); +int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 35612eb180..5bb8b65b0b 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -485,6 +485,12 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory return 0; } +int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) { + void* exec = pTask->exec.pExecutor; + qResetStreamInfoTimeWindow(exec); + return 0; +} + int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY); SStreamMeta* pMeta = pTask->pMeta;