fix(stream): fix dead-lock.

This commit is contained in:
Haojun Liao 2024-06-03 19:14:45 +08:00
parent 3b3ed1c30c
commit 407f79cfa6
1 changed files with 11 additions and 13 deletions

View File

@ -332,17 +332,15 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
// 1. not in checkpoint status now // 1. not in checkpoint status now
SStreamTaskState* pStat = streamTaskGetStatus(pTask); SStreamTaskState* pStat = streamTaskGetStatus(pTask);
if (pStat->state != TASK_STATUS__CK) { if (pStat->state != TASK_STATUS__CK) {
stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat->name, downstreamTaskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS; return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
} }
// 2. expired checkpoint-ready msg // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
if (pTask->chkInfo.checkpointId > checkpointId) { if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
// discard it directly stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
return -1; ") from task:0x%x, expired and discard ",
} id, pStat->name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
// invalid checkpoint-ready msg
if (pInfo->activeId != checkpointId) {
return -1; return -1;
} }
@ -372,13 +370,14 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
} }
int32_t notReady = total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList); int32_t notReady = total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
int32_t transId = pInfo->transId;
taosThreadMutexUnlock(&pInfo->lock);
if (notReady == 0) { if (notReady == 0) {
stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", id);
id); appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId);
} }
taosThreadMutexUnlock(&pInfo->lock);
return 0; return 0;
} }
@ -944,7 +943,6 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int32_t taskId = 0; int32_t taskId = 0;
taosThreadMutexLock(&pInfo->lock); taosThreadMutexLock(&pInfo->lock);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {