fix(stream): avoid dead lock.

This commit is contained in:
Haojun Liao 2023-12-20 18:07:53 +08:00
parent 43fb7cf14a
commit 849aaf8d4f
2 changed files with 15 additions and 7 deletions

View File

@ -1215,6 +1215,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive >= 1) { if (pTask->status.timerActive >= 1) {
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart", pTask->id.idStr, pMeta->vgId);
inTimer = true; inTimer = true;
} }
} }

View File

@ -48,9 +48,13 @@ static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doProcessDownstreamReadyRsp(SStreamTask* pTask); static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t streamTaskSetReady(SStreamTask* pTask) {
char* p = NULL; SStreamMeta* pMeta = pTask->pMeta;
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); SStreamTaskId id = pTask->id;
ETaskStatus status = streamTaskGetStatus(pTask, &p); int64_t initTs = pTask->execInfo.init;
int64_t startTs = pTask->execInfo.start;
char* p = NULL;
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if ((status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY) && if ((status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY) &&
pTask->info.taskLevel != TASK_LEVEL__SOURCE) { pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
@ -67,8 +71,12 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfDowns, el, p); pTask->id.idStr, numOfDowns, el, p);
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, taosThreadMutexUnlock(&pTask->lock);
pTask->execInfo.start, true);
// to avoid deadlock
streamMetaUpdateTaskDownstreamStatus(pMeta, id.streamId, id.taskId, initTs, startTs, true);
taosThreadMutexLock(&pTask->lock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -461,8 +469,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
", current stage:%" PRId64 ", current stage:%" PRId64
", " ", not check wait for downstream task nodeUpdate, and all tasks restart",
"not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
} else { } else {