fix(stream): record the start failure tasks
This commit is contained in:
parent
c61203b55a
commit
25f798e42f
|
@ -661,8 +661,6 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
|||
if (isLeader && !tsDisableStream) {
|
||||
tqStreamTaskResetStatus(pMeta);
|
||||
streamMetaWUnLock(pMeta);
|
||||
tqInfo("vgId:%d start all stream tasks after reload tasks from disk", vgId);
|
||||
|
||||
streamMetaStartAllTasks(pMeta);
|
||||
} else {
|
||||
streamMetaResetStartInfo(&pMeta->startInfo);
|
||||
|
|
|
@ -1429,8 +1429,9 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
stDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
|
||||
stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
|
||||
if (numOfTasks == 0) {
|
||||
stInfo("vgId:%d start tasks completed", pMeta->vgId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1443,16 +1444,20 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// broadcast the check downstream tasks msg
|
||||
int64_t now = taosGetTimestampMs();
|
||||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
if (pTask == NULL) {
|
||||
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0,
|
||||
taosGetTimestampMs(), false);
|
||||
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
||||
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false);
|
||||
continue;
|
||||
}
|
||||
|
||||
// fill-history task can only be launched by related stream tasks.
|
||||
STaskExecStatisInfo* pInfo = &pTask->execInfo;
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
continue;
|
||||
|
@ -1465,8 +1470,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
streamLaunchFillHistoryTask(pTask);
|
||||
}
|
||||
|
||||
streamMetaUpdateTaskDownstreamStatus(pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
|
||||
pTask->execInfo.start, true);
|
||||
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
@ -1474,12 +1478,16 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
stError("vgId:%d failed to handle event:%d", pMeta->vgId, event);
|
||||
code = ret;
|
||||
|
||||
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
||||
stInfo("vgId:%d start tasks completed", pMeta->vgId);
|
||||
taosArrayDestroy(pTaskList);
|
||||
return code;
|
||||
}
|
|
@ -1114,9 +1114,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
|
|||
pStartInfo->readyTs = taosGetTimestampMs();
|
||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||
|
||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64
|
||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
|
||||
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
||||
pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs,
|
||||
pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
|
||||
pStartInfo->elapsedTime / 1000.0);
|
||||
|
||||
// print the initialization elapsed time and info
|
||||
|
@ -1128,7 +1128,8 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
|
|||
pStartInfo->completeFn(pMeta);
|
||||
} else {
|
||||
streamMetaWUnLock(pMeta);
|
||||
stDebug("vgId:%d recv check downstream results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);
|
||||
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId,
|
||||
ready, numOfRecv, numOfTotal);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
Loading…
Reference in New Issue