fix(stream): fix deadlock

This commit is contained in:
Haojun Liao 2024-06-21 23:40:03 +08:00
parent 82febd30a0
commit 94f3c6ec2d
2 changed files with 4 additions and 1 deletions

View File

@ -630,6 +630,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) { int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_DISABLE) { if (type == DATA_UPLOAD_DISABLE) {
stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr);
return 0; return 0;
} }

View File

@ -795,7 +795,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
pActiveInfo->sendReadyCheckCounter = 0; pActiveInfo->sendReadyCheckCounter = 0;
stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id); stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id);
taosThreadMutexLock(&pActiveInfo->lock); taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) { if (pState->state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
@ -806,6 +806,8 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
return; return;
} }
taosThreadMutexLock(&pActiveInfo->lock);
SArray* pList = pActiveInfo->pReadyMsgList; SArray* pList = pActiveInfo->pReadyMsgList;
SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t)); SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t));