diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b0bffac9ee..e6d5f023d6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -882,19 +882,33 @@ void metaHbToMnode(void* param, void* tmrId) { stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER)); SStreamHbMsg hbMsg = {0}; + SEpSet epset = {0}; + bool hasMnodeEpset = false; + int32_t stage = 0; + streamMetaRLock(pMeta); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - - SEpSet epset = {0}; - bool hasMnodeEpset = false; - hbMsg.vgId = pMeta->vgId; + stage = pMeta->stage; + + SArray* pIdList = taosArrayDup(pMeta->pTaskList, NULL); + + streamMetaRUnLock(pMeta); + hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId* pId = taosArrayGet(pIdList, i); + + streamMetaRLock(pMeta); SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); + streamMetaRUnLock(pMeta); + + if (pTask == NULL) { + continue; + } // not report the status of fill-history task if ((*pTask)->info.fillHistory == 1) { @@ -904,12 +918,12 @@ void metaHbToMnode(void* param, void* tmrId) { STaskStatusEntry entry = { .id = *pId, .status = streamTaskGetStatus(*pTask, NULL), - .nodeId = pMeta->vgId, - .stage = pMeta->stage, + .nodeId = hbMsg.vgId, + .stage = stage, .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), }; - entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; + entry.inputRate = entry.inputQUsed * 100.0 / STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); @@ -928,9 +942,9 @@ void metaHbToMnode(void* param, void* tmrId) { taosThreadMutexLock(&(*pTask)->lock); int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList); for (int j = 0; j < num; ++j) { - int32_t *pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); + int32_t* pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); - bool exist = false; + bool exist = false; int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes); for (int k = 0; k < numOfExisted; ++k) { if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) { @@ -955,7 +969,6 @@ void metaHbToMnode(void* param, void* tmrId) { } hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); - streamMetaRUnLock(pMeta); if (hasMnodeEpset) { int32_t code = 0; @@ -1091,24 +1104,20 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { } void streamMetaRLock(SStreamMeta* pMeta) { - stDebug("vgId:%d meta-rlock", pMeta->vgId); + stTrace("vgId:%d meta-rlock", pMeta->vgId); taosRLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-rlock done", pMeta->vgId); } void streamMetaRUnLock(SStreamMeta* pMeta) { - stDebug("vgId:%d meta-runlock", pMeta->vgId); + stTrace("vgId:%d meta-runlock", pMeta->vgId); taosRUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-runlock done", pMeta->vgId); } void streamMetaWLock(SStreamMeta* pMeta) { - stDebug("vgId:%d meta-wlock", pMeta->vgId); + stTrace("vgId:%d meta-wlock", pMeta->vgId); taosWLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wlock done", pMeta->vgId); } void streamMetaWUnLock(SStreamMeta* pMeta) { - stDebug("vgId:%d meta-wunlock", pMeta->vgId); + stTrace("vgId:%d meta-wunlock", pMeta->vgId); taosWUnLockLatch(&pMeta->lock); - stDebug("vgId:%d meta-wunlock done", pMeta->vgId); }