diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index cb940aa56c..3b7fe3cfdd 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -612,9 +612,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen streamMetaReleaseTask(pMeta, pTask); } - streamMetaWLock(pMeta); - streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, false); - streamMetaWUnLock(pMeta); + streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, true); // drop the stream task now streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a48dbb574c..f3494377d6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -763,11 +763,11 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool SStreamMeta* pMeta = pTask->pMeta; STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; if (pTask->info.fillHistory == 0) { - return 0; + return TSDB_CODE_SUCCESS; } if (metaLock) { - streamMetaWLock(pTask->pMeta); + streamMetaWLock(pMeta); } SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId)); @@ -788,7 +788,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool } if (metaLock) { - streamMetaWUnLock(pTask->pMeta); + streamMetaWUnLock(pMeta); } return TSDB_CODE_SUCCESS;