diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 970af07aa1..31bf5a482b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -532,6 +532,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t taosWLockLatch(&pMeta->lock); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { + // it is an fill-history task, remove the related stream task's id that points to it + if ((*ppTask)->info.fillHistory == 1) { + STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; + SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId)); + if (ppStreamTask != NULL) { + (*ppStreamTask)->historyTaskId.taskId = 0; + (*ppStreamTask)->historyTaskId.streamId = 0; + } + } + taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); @@ -545,16 +555,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamMetaReleaseTask(pMeta, pTask); } - // it is an fill-history task, remove the related stream task's id that points to it - if ((*ppTask)->info.fillHistory == 1) { - STaskId id1 = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; - SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id1, sizeof(id1)); - if (ppStreamTask != NULL) { - (*ppStreamTask)->historyTaskId.taskId = 0; - (*ppStreamTask)->historyTaskId.streamId = 0; - } - } - streamMetaRemoveTask(pMeta, &id); streamMetaReleaseTask(pMeta, pTask); } else {