fix: fix deadlock.
This commit is contained in:
parent
8a714db17d
commit
c61203b55a
|
@ -1065,28 +1065,16 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
SStreamHbMsg hbMsg = {0};
|
SStreamHbMsg hbMsg = {0};
|
||||||
SEpSet epset = {0};
|
SEpSet epset = {0};
|
||||||
bool hasMnodeEpset = false;
|
bool hasMnodeEpset = false;
|
||||||
int64_t stage = 0;
|
|
||||||
|
|
||||||
streamMetaRLock(pMeta);
|
|
||||||
|
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
|
|
||||||
hbMsg.vgId = pMeta->vgId;
|
hbMsg.vgId = pMeta->vgId;
|
||||||
stage = pMeta->stage;
|
|
||||||
|
|
||||||
SArray* pIdList = taosArrayDup(pMeta->pTaskList, NULL);
|
|
||||||
|
|
||||||
streamMetaRUnLock(pMeta);
|
|
||||||
|
|
||||||
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
|
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
|
||||||
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
|
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
STaskId* pId = taosArrayGet(pIdList, i);
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
|
||||||
streamMetaRLock(pMeta);
|
|
||||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
||||||
streamMetaRUnLock(pMeta);
|
|
||||||
|
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1100,7 +1088,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
.id = *pId,
|
.id = *pId,
|
||||||
.status = streamTaskGetStatus(*pTask, NULL),
|
.status = streamTaskGetStatus(*pTask, NULL),
|
||||||
.nodeId = hbMsg.vgId,
|
.nodeId = hbMsg.vgId,
|
||||||
.stage = stage,
|
.stage = pMeta->stage,
|
||||||
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1170,7 +1158,6 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
streamMetaClearHbMsg(&hbMsg);
|
streamMetaClearHbMsg(&hbMsg);
|
||||||
taosArrayDestroy(pIdList);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1210,7 +1197,9 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
|
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
|
||||||
|
streamMetaRLock(pMeta);
|
||||||
metaHeartbeatToMnodeImpl(pMeta);
|
metaHeartbeatToMnodeImpl(pMeta);
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
|
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
|
||||||
taosReleaseRef(streamMetaId, rid);
|
taosReleaseRef(streamMetaId, rid);
|
||||||
|
|
Loading…
Reference in New Issue