From 6c93fe559344a332b1acd70fee098fe50dd1fe37 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 18:35:00 +0800 Subject: [PATCH] fix(stream): fix dead lock. --- source/libs/stream/src/streamMeta.c | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2f9a579bcc..edc1a148a9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1710,21 +1710,29 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { int32_t code = TSDB_CODE_SUCCESS; + int64_t now = taosGetTimestampMs(); + int64_t startTs = 0; + bool hasFillhistoryTask = false; + STaskId hId = {0}; + + stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId); streamMetaRLock(pMeta); - stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId); STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL) { - STaskCheckInfo* pInfo = &(*ppTask)->taskCheckInfo; - int64_t now = taosGetTimestampMs(); - streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->startTs, now, false); + startTs = (*ppTask)->taskCheckInfo.startTs; + hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask); + hId = (*ppTask)->hTaskInfo.id; - if (HAS_RELATED_FILLHISTORY_TASK(*ppTask)) { - STaskId hId = (*ppTask)->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, pInfo->startTs, now, false); + streamMetaRUnLock(pMeta); + + // add the failed task info, along with the related fill-history task info into tasks list. + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); + if (hasFillhistoryTask) { + streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); } } else { stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", @@ -1732,6 +1740,5 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta code = TSDB_CODE_STREAM_TASK_NOT_EXIST; } - streamMetaRUnLock(pMeta); return code; } \ No newline at end of file