diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b4829d1dd8..0e5b1b6fb7 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -365,6 +365,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { code = streamTrySchedExec(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); + taosArrayDestroy(pTaskList); return -1; } } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 5e67f1766f..55209fc427 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -280,6 +280,13 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { streamMutexLock(&pInfo->checkInfoLock); + // drop procedure already started, not start check downstream now + ETaskStatus s = streamTaskGetStatus(pTask).state; + if (s == TASK_STATUS__DROPPING) { + streamMutexUnlock(&pInfo->checkInfoLock); + return; + } + int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { streamMutexUnlock(&pInfo->checkInfoLock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 42d2f86dac..6f3b7d8b32 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -742,7 +742,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamMetaRLock(pMeta); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { + // to make sure check status will not start the check downstream status when we start to check timerActive count. + streamMutexLock(&pTask->taskCheckInfo.checkInfoLock); timerActive = (*ppTask)->status.timerActive; + streamMutexUnlock(&pTask->taskCheckInfo.checkInfoLock); } streamMetaRUnLock(pMeta);