fix(stream): fix dead-lock

This commit is contained in:
Haojun Liao 2023-11-01 19:09:21 +08:00
parent ae25f09c45
commit e7609d8e56
1 changed files with 28 additions and 19 deletions

View File

@ -882,19 +882,33 @@ void metaHbToMnode(void* param, void* tmrId) {
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
SStreamHbMsg hbMsg = {0};
SEpSet epset = {0};
bool hasMnodeEpset = false;
int32_t stage = 0;
streamMetaRLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SEpSet epset = {0};
bool hasMnodeEpset = false;
hbMsg.vgId = pMeta->vgId;
stage = pMeta->stage;
SArray* pIdList = taosArrayDup(pMeta->pTaskList, NULL);
streamMetaRUnLock(pMeta);
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId* pId = taosArrayGet(pIdList, i);
streamMetaRLock(pMeta);
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
streamMetaRUnLock(pMeta);
if (pTask == NULL) {
continue;
}
// not report the status of fill-history task
if ((*pTask)->info.fillHistory == 1) {
@ -904,12 +918,12 @@ void metaHbToMnode(void* param, void* tmrId) {
STaskStatusEntry entry = {
.id = *pId,
.status = streamTaskGetStatus(*pTask, NULL),
.nodeId = pMeta->vgId,
.stage = pMeta->stage,
.nodeId = hbMsg.vgId,
.stage = stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
};
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
entry.inputRate = entry.inputQUsed * 100.0 / STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
@ -928,9 +942,9 @@ void metaHbToMnode(void* param, void* tmrId) {
taosThreadMutexLock(&(*pTask)->lock);
int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList);
for (int j = 0; j < num; ++j) {
int32_t *pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
int32_t* pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
bool exist = false;
bool exist = false;
int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes);
for (int k = 0; k < numOfExisted; ++k) {
if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) {
@ -955,7 +969,6 @@ void metaHbToMnode(void* param, void* tmrId) {
}
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
streamMetaRUnLock(pMeta);
if (hasMnodeEpset) {
int32_t code = 0;
@ -1091,24 +1104,20 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
}
void streamMetaRLock(SStreamMeta* pMeta) {
stDebug("vgId:%d meta-rlock", pMeta->vgId);
stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosRLockLatch(&pMeta->lock);
stDebug("vgId:%d meta-rlock done", pMeta->vgId);
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
stDebug("vgId:%d meta-runlock", pMeta->vgId);
stTrace("vgId:%d meta-runlock", pMeta->vgId);
taosRUnLockLatch(&pMeta->lock);
stDebug("vgId:%d meta-runlock done", pMeta->vgId);
}
void streamMetaWLock(SStreamMeta* pMeta) {
stDebug("vgId:%d meta-wlock", pMeta->vgId);
stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosWLockLatch(&pMeta->lock);
stDebug("vgId:%d meta-wlock done", pMeta->vgId);
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
stDebug("vgId:%d meta-wunlock", pMeta->vgId);
stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosWUnLockLatch(&pMeta->lock);
stDebug("vgId:%d meta-wunlock done", pMeta->vgId);
}