From 8d848a5bf0edc5a17efab83f131560e2b2e30cb5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 21 Feb 2024 09:40:22 +0800 Subject: [PATCH 1/2] fix(stream): add null ptr check. --- source/libs/stream/src/streamTask.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 281e2ed550..a48dbb574c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -756,6 +756,10 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) { } int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool metaLock) { + if (pTask == NULL) { + return TSDB_CODE_SUCCESS; + } + SStreamMeta* pMeta = pTask->pMeta; STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; if (pTask->info.fillHistory == 0) { From d6f5ae226b62065a0535a6dd4d2a2dbbe0652b07 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 21 Feb 2024 09:42:23 +0800 Subject: [PATCH 2/2] refactor: do some internal refactor. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 +--- source/libs/stream/src/streamTask.c | 6 +++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index cb940aa56c..3b7fe3cfdd 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -612,9 +612,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen streamMetaReleaseTask(pMeta, pTask); } - streamMetaWLock(pMeta); - streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, false); - streamMetaWUnLock(pMeta); + streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, true); // drop the stream task now streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a48dbb574c..f3494377d6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -763,11 +763,11 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool SStreamMeta* pMeta = pTask->pMeta; STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; if (pTask->info.fillHistory == 0) { - return 0; + return TSDB_CODE_SUCCESS; } if (metaLock) { - streamMetaWLock(pTask->pMeta); + streamMetaWLock(pMeta); } SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId)); @@ -788,7 +788,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool } if (metaLock) { - streamMetaWUnLock(pTask->pMeta); + streamMetaWUnLock(pMeta); } return TSDB_CODE_SUCCESS;