fix(stream): fix dead lock caused by refactor.

This commit is contained in:
Haojun Liao 2024-07-31 19:15:17 +08:00
parent c449299e46
commit d364397e0d
2 changed files with 11 additions and 5 deletions

View File

@ -390,6 +390,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
if (p == NULL) { if (p == NULL) {
streamMutexUnlock(&pInfo->lock);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -433,6 +434,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo == NULL) { if (pReadyInfo == NULL) {
streamMutexUnlock(&pInfo->lock);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -447,6 +449,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo == NULL) { if (pReadyInfo == NULL) {
streamMutexUnlock(&pInfo->lock);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -830,6 +833,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
if (pNotSendList == NULL) { if (pNotSendList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
streamMutexUnlock(&pActiveInfo->lock);
return; return;
} }
@ -938,13 +942,14 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
streamMutexLock(&pInfo->lock); streamMutexLock(&pInfo->lock);
if (!pInfo->dispatchTrigger) { if (!pInfo->dispatchTrigger) {
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pInfo->lock);
return false; return false;
} }
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (pSendInfo == NULL) { if (pSendInfo == NULL) {
streamMutexUnlock(&pInfo->lock);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -964,11 +969,11 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId); id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
} }
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pInfo->lock);
return true; return true;
} }
ASSERT(0); streamMutexUnlock(&pInfo->lock);
return false; return false;
} }
@ -1028,6 +1033,7 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (p == NULL) { if (p == NULL) {
streamMutexUnlock(&pInfo->lock);
return num; return num;
} }

View File

@ -907,8 +907,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i); STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
SRpcMsg msg = {0}; SRpcMsg msg = {0};
int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, pInfo->checkpointId, int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId,
&msg); pInfo->checkpointId, &msg);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = tmsgSendReq(&pInfo->upstreamNodeEpset, &msg); code = tmsgSendReq(&pInfo->upstreamNodeEpset, &msg);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {