From 73d75aac25e234f2c3aa7873a99998f8850170b5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 18:18:58 +0800 Subject: [PATCH] fix(stream): add ref for task. --- source/libs/stream/src/streamStart.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 9044e65eb6..18a8106443 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -98,8 +98,10 @@ static void doReExecScanhistory(void* param, void* tmrId) { char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + streamMetaReleaseTask(pTask->pMeta, pTask); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref); + return; } if (pTask->schedHistoryInfo.numOfTicks <= 0) { @@ -108,6 +110,9 @@ static void doReExecScanhistory(void* param, void* tmrId) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr, pTask->info.fillHistory, ref); + + // release the task. + streamMetaReleaseTask(pTask->pMeta, pTask); } else { taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, &pTask->schedHistoryInfo.pTimer); @@ -122,6 +127,10 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) numOfTicks = SCANHISTORY_IDLE_TICK; } + // add ref for task + SStreamTask* p = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); + ASSERT(p != NULL); + pTask->schedHistoryInfo.numOfTicks = numOfTicks; int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);