diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4c4f5e98ab..8e97d05f7f 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -630,6 +630,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_DISABLE) { + stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr); return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c0f559b881..8811bec889 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -795,7 +795,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { pActiveInfo->sendReadyCheckCounter = 0; stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id); - taosThreadMutexLock(&pActiveInfo->lock); + taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state != TASK_STATUS__CK) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); @@ -806,6 +806,8 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { return; } + taosThreadMutexLock(&pActiveInfo->lock); + SArray* pList = pActiveInfo->pReadyMsgList; SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t));