fix(stream): commit the update to make sure the new state of tasks will be persistent to disk.

This commit is contained in:
Haojun Liao 2023-08-07 19:30:23 +08:00
parent 3d2318317d
commit 2da67392c6
1 changed files with 2 additions and 0 deletions

View File

@ -495,11 +495,13 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pTask);
streamMetaCommit(pMeta);
taosWUnLockLatch(&pMeta->lock);
// history data scan in the stream time window finished, now let's enable the pause
streamTaskEnablePause(pTask);
// for source tasks, let's continue execute.
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamSchedExec(pTask);
}