diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a115783b70..04694b05fd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -872,7 +872,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, +int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7883c858f0..5a92677462 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -526,7 +526,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe streamMetaRUnLock(pMeta); if (hasHistoryTask) { - streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); } tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, @@ -539,7 +539,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; } - streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); return code; } @@ -553,13 +553,13 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe streamMetaRUnLock(pMeta); if (hasHistoryTask) { - streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); } } else { streamMetaRUnLock(pMeta); } - streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", rsp.streamId, rsp.upstreamTaskId, vgId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 68348f462f..ee2b70eebe 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1466,7 +1466,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false); continue; } @@ -1487,7 +1487,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { streamLaunchFillHistoryTask(pTask); } - streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -1497,10 +1497,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); code = ret; - streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaUpdateTaskDownstreamStatus(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); } } @@ -1519,7 +1519,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId); - streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false); + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } @@ -1538,10 +1538,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas if (ret != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); - streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->init, pInfo->start, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaUpdateTaskDownstreamStatus(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 24909a9776..27ed6af402 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -26,6 +26,7 @@ typedef struct SLaunchHTaskInfo { SStreamMeta* pMeta; STaskId id; + STaskId hTaskId; } SLaunchHTaskInfo; typedef struct STaskRecheckInfo { @@ -43,7 +44,8 @@ typedef struct STaskInitTs { static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); -static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId, + int32_t hTaskId); static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); @@ -394,7 +396,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { int64_t initTs = pTask->execInfo.init; int64_t startTs = pTask->execInfo.start; - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); // start the related fill-history task, when current task is ready // not invoke in success callback due to the deadlock. @@ -492,12 +494,12 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t startTs = pTask->execInfo.init; int64_t now = taosGetTimestampMs(); - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); + streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); @@ -731,21 +733,27 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) static void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; + int64_t now = taosGetTimestampMs(); streamMetaWLock(pMeta); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); - if (ppTask) { + if (ppTask != NULL) { ASSERT((*ppTask)->status.timerActive >= 1); if (streamTaskShouldStop(*ppTask)) { char* p = streamTaskGetStatus(*ppTask)->name; + int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); + streamMetaWUnLock(pMeta); + + // record the related fill-history task failed + STaskId* pHTaskId = &(*ppTask)->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pMeta, pHTaskId->streamId, pHTaskId->taskId, 0, now, false); taosMemoryFree(pInfo); - streamMetaWUnLock(pMeta); return; } } @@ -764,8 +772,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - streamMetaReleaseTask(pMeta, pTask); + streamMetaAddTaskLaunchResult(pMeta, pInfo->id.streamId, pInfo->id.taskId, 0, now, false); + streamMetaReleaseTask(pMeta, pTask); stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d", pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref); @@ -798,91 +807,132 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, pHTaskInfo->retryTimes, ref); + streamMetaReleaseTask(pMeta, pTask); } } else { - stError("s-task:0x%x failed to load task, it may have been destroyed, not launch related fill-history task", + streamMetaAddTaskLaunchResult(pMeta, pInfo->id.streamId, pInfo->id.taskId, 0, now, false); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:0x%x failed to load fill-history task, it may have been destroyed, not launch fill-history task", (int32_t)pInfo->id.taskId); } taosMemoryFree(pInfo); } -SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { +SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId, + int32_t hTaskId) { SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo)); if (pInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pInfo->id.taskId = taskId; pInfo->id.streamId = streamId; + pInfo->id.taskId = taskId; + + pInfo->hTaskId.streamId = hStreamId; + pInfo->hTaskId.taskId = hTaskId; + pInfo->pMeta = pMeta; return pInfo; } -// an fill history task needs to be started. -int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; - - int64_t streamId = pTask->hTaskInfo.id.streamId; - int32_t hTaskId = pTask->hTaskInfo.id.taskId; +static int32_t handleNotBuiltFillHistoryTask(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + STaskExecStatisInfo* pExecInfo = &pTask->execInfo; + const char* idStr = pTask->id.idStr; + int64_t hStreamId = pTask->hTaskInfo.id.streamId; + int32_t hTaskId = pTask->hTaskInfo.id.taskId; ASSERT(hTaskId != 0); - SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - if (pStatus->state != TASK_STATUS__READY) { - STaskExecStatisInfo* pInfo = &pTask->execInfo; - stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", pTask->id.idStr, - pTask->hTaskInfo.id.streamId, hTaskId, pStatus->name); + stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId); - streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, hTaskId, pInfo->init, pInfo->start, false); - return -1;// todo set the correct error code - } else { - stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, - pTask->hTaskInfo.id.streamId, hTaskId); + SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId); + if (pInfo == NULL) { + stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr); + + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + return terrno; } - // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); - if (pHTask == NULL) { - stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", pTask->id.idStr, pMeta->vgId, - hTaskId); + // set the launch time info + streamTaskInitForLaunchHTask(&pTask->hTaskInfo); - SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); - if (pInfo == NULL) { - stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", pTask->id.idStr); + // check for the timer + if (pTask->hTaskInfo.pTimer == NULL) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer); + + if (pTask->hTaskInfo.pTimer == NULL) { + ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, + pTask->status.timerActive); + + taosMemoryFree(pInfo); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); return terrno; } - streamTaskInitForLaunchHTask(&pTask->hTaskInfo); - if (pTask->hTaskInfo.pTimer == NULL) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer); - if (pTask->hTaskInfo.pTimer == NULL) { - atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr, - pTask->status.timerActive); - taosMemoryFree(pInfo); - } else { - ASSERT(ref >= 1); - stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref); + ASSERT(ref >= 1); + + stDebug("s-task:%s set timer active flag, ref:%d", idStr, ref); + } else { // timer exists + ASSERT(pTask->status.timerActive >= 1); + stDebug("s-task:%s set timer active flag, task timer not null", idStr); + taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer); + } + + return TSDB_CODE_SUCCESS; +} + +// an fill history task needs to be started. +int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + STaskExecStatisInfo* pExecInfo = &pTask->execInfo; + const char* idStr = pTask->id.idStr; + int64_t hStreamId = pTask->hTaskInfo.id.streamId; + int32_t hTaskId = pTask->hTaskInfo.id.taskId; + ASSERT(hTaskId != 0); + + // check stream task status in the first place. + SStreamTaskState* pStatus = streamTaskGetStatus(pTask); + if (pStatus->state != TASK_STATUS__READY) { + stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, + pStatus->name); + + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + return -1; // todo set the correct error code + } + + stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId); + + // Set the execute conditions, including the query time window and the version range + streamMetaRLock(pMeta); + SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); + streamMetaRUnLock(pMeta); + + if (pHTask != NULL) { + SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); + if (pHisTask == NULL) { + stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped", idStr); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + } else { + if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing + stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, true); + } else { // exist, but not ready, continue check downstream task status + checkFillhistoryTaskStatus(pTask, pHisTask); } - } else { // timer exists - ASSERT(pTask->status.timerActive >= 1); - stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); - taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer); + + streamMetaReleaseTask(pMeta, pHisTask); } return TSDB_CODE_SUCCESS; } - if ((*pHTask)->status.downstreamReady == 1) { - stDebug("s-task:%s fill-history task is ready, no need to check downstream", (*pHTask)->id.idStr); - } else { - checkFillhistoryTaskStatus(pTask, *pHTask); - } - - return TSDB_CODE_SUCCESS; + return handleNotBuiltFillHistoryTask(pTask); } int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { @@ -1114,7 +1164,7 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) } } -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, +int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready) { STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskId id = {.streamId = streamId, .taskId = taskId};