From c61203b55a98707c12675358c2e6e56b175395e3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 23 Dec 2023 16:37:21 +0800 Subject: [PATCH] fix: fix deadlock. --- source/libs/stream/src/streamMeta.c | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7217a49ff8..46e3dded11 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1065,28 +1065,16 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { SStreamHbMsg hbMsg = {0}; SEpSet epset = {0}; bool hasMnodeEpset = false; - int64_t stage = 0; + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - streamMetaRLock(pMeta); - - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); hbMsg.vgId = pMeta->vgId; - stage = pMeta->stage; - - SArray* pIdList = taosArrayDup(pMeta->pTaskList, NULL); - - streamMetaRUnLock(pMeta); - hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pId = taosArrayGet(pIdList, i); + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); - streamMetaRLock(pMeta); SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); - streamMetaRUnLock(pMeta); - if (pTask == NULL) { continue; } @@ -1100,7 +1088,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { .id = *pId, .status = streamTaskGetStatus(*pTask, NULL), .nodeId = hbMsg.vgId, - .stage = stage, + .stage = pMeta->stage, .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), }; @@ -1170,7 +1158,6 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { _end: streamMetaClearHbMsg(&hbMsg); - taosArrayDestroy(pIdList); return TSDB_CODE_SUCCESS; } @@ -1210,7 +1197,9 @@ void metaHbToMnode(void* param, void* tmrId) { } stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER)); + streamMetaRLock(pMeta); metaHeartbeatToMnodeImpl(pMeta); + streamMetaRUnLock(pMeta); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid);