From 25f798e42f24c90917192ce30c705053d156d43b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 23 Dec 2023 23:29:30 +0800 Subject: [PATCH] fix(stream): record the start failure tasks --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 -- source/libs/stream/src/streamMeta.c | 20 ++++++++++++++------ source/libs/stream/src/streamStart.c | 7 ++++--- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index af9363e363..f576345f64 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -661,8 +661,6 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { if (isLeader && !tsDisableStream) { tqStreamTaskResetStatus(pMeta); streamMetaWUnLock(pMeta); - tqInfo("vgId:%d start all stream tasks after reload tasks from disk", vgId); - streamMetaStartAllTasks(pMeta); } else { streamMetaResetStartInfo(&pMeta->startInfo); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 46e3dded11..a46853fc26 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1429,8 +1429,9 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); + stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); if (numOfTasks == 0) { + stInfo("vgId:%d start tasks completed", pMeta->vgId); return TSDB_CODE_SUCCESS; } @@ -1443,16 +1444,20 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { streamMetaWUnLock(pMeta); // broadcast the check downstream tasks msg + int64_t now = taosGetTimestampMs(); + for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { - streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0, - taosGetTimestampMs(), false); + stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); + streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false); continue; } // fill-history task can only be launched by related stream tasks. + STaskExecStatisInfo* pInfo = &pTask->execInfo; if (pTask->info.fillHistory == 1) { streamMetaReleaseTask(pMeta, pTask); continue; @@ -1465,8 +1470,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { streamLaunchFillHistoryTask(pTask); } - streamMetaUpdateTaskDownstreamStatus(pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, - pTask->execInfo.start, true); + streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -1474,12 +1478,16 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); if (ret != TSDB_CODE_SUCCESS) { + stError("vgId:%d failed to handle event:%d", pMeta->vgId, event); code = ret; + + streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false); } streamMetaReleaseTask(pMeta, pTask); } + stInfo("vgId:%d start tasks completed", pMeta->vgId); taosArrayDestroy(pTaskList); return code; } \ No newline at end of file diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 3bef6adf57..267dadf109 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1114,9 +1114,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64 + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 ", readyTs:%" PRId64 " total elapsed time:%.2fs", - pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs, + pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); // print the initialization elapsed time and info @@ -1128,7 +1128,8 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI pStartInfo->completeFn(pMeta); } else { streamMetaWUnLock(pMeta); - stDebug("vgId:%d recv check downstream results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); + stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId, + ready, numOfRecv, numOfTotal); } return TSDB_CODE_SUCCESS;