From 407f79cfa69c08aec0822b3ab4cb8520b4f68a90 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Jun 2024 19:14:45 +0800 Subject: [PATCH] fix(stream): fix dead-lock. --- source/libs/stream/src/streamCheckpoint.c | 24 +++++++++++------------ 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b0c4884c73..94d2198e31 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -332,17 +332,15 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId // 1. not in checkpoint status now SStreamTaskState* pStat = streamTaskGetStatus(pTask); if (pStat->state != TASK_STATUS__CK) { + stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat->name, downstreamTaskId); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } - // 2. expired checkpoint-ready msg - if (pTask->chkInfo.checkpointId > checkpointId) { - // discard it directly - return -1; - } - - // invalid checkpoint-ready msg - if (pInfo->activeId != checkpointId) { + // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg + if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) { + stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64 + ") from task:0x%x, expired and discard ", + id, pStat->name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId); return -1; } @@ -372,13 +370,14 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId } int32_t notReady = total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList); + int32_t transId = pInfo->transId; + taosThreadMutexUnlock(&pInfo->lock); + if (notReady == 0) { - stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", - id); - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId); + stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", id); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId); } - taosThreadMutexUnlock(&pInfo->lock); return 0; } @@ -944,7 +943,6 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; int32_t taskId = 0; - taosThreadMutexLock(&pInfo->lock); for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {