diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a115783b70..04694b05fd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -872,7 +872,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, +int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7883c858f0..5a92677462 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -526,7 +526,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe streamMetaRUnLock(pMeta); if (hasHistoryTask) { - streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); } tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, @@ -539,7 +539,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe code = terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; } - streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); return code; } @@ -553,13 +553,13 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe streamMetaRUnLock(pMeta); if (hasHistoryTask) { - streamMetaUpdateTaskDownstreamStatus(pMeta, fId.streamId, fId.taskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); } } else { streamMetaRUnLock(pMeta); } - streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, rsp.streamId, rsp.upstreamTaskId, initTs, now, false); tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", rsp.streamId, rsp.upstreamTaskId, vgId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1c9361fa61..cb3f7a3504 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -294,11 +294,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { int32_t code = 0; SCheckpointInfo* pCKInfo = &p->chkInfo; - if (p->info.fillHistory == 1) { - return code; - } - - if (p->info.taskLevel > TASK_LEVEL__SINK) { + // fill-history task, rsma task, and sink task will not generate the checkpoint + if ((p->info.fillHistory == 1) || (p->info.taskLevel > TASK_LEVEL__SINK)) { return code; } @@ -333,7 +330,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name); // save the task if not sink task - if (p->info.taskLevel != TASK_LEVEL__SINK) { + if (p->info.taskLevel < TASK_LEVEL__SINK) { streamMetaWLock(pMeta); code = streamMetaSaveTask(pMeta, p); @@ -451,16 +448,17 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL); } int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t startTs = pTask->chkInfo.startTs; - int64_t ckId = pTask->chkInfo.checkpointingId; + int32_t code = TSDB_CODE_SUCCESS; + int64_t startTs = pTask->chkInfo.startTs; + int64_t ckId = pTask->chkInfo.checkpointingId; + const char* id = pTask->id.idStr; // sink task do not need to save the status, and generated the checkpoint if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - stDebug("s-task:%s level:%d start gen checkpoint", pTask->id.idStr, pTask->info.taskLevel); + stDebug("s-task:%s level:%d start gen checkpoint", id, pTask->info.taskLevel); code = streamBackendDoCheckpoint(pTask->pBackend, ckId); if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, tstrerror(terrno)); + stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); } } @@ -474,39 +472,38 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { // todo: let's retry send rsp to upstream/mnode - stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, - ckId, tstrerror(code)); + stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId, + tstrerror(code)); } } // clear the checkpoint info, and commit the newest checkpoint info if all works are done successfully if (code == TSDB_CODE_SUCCESS) { code = streamSaveTaskCheckpointInfo(pTask, ckId); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, - tstrerror(code)); - } else { - code = streamTaskUploadChkp(pTask, ckId, (char*)pTask->id.idStr); - if (code != 0) { - stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", pTask->id.idStr, ckId); + if (code == TSDB_CODE_SUCCESS) { + code = streamTaskUploadChkp(pTask, ckId, (char*)id); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId); } + } else { + stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code)); } } - if (code != TSDB_CODE_SUCCESS) { // clear the checkpoint info if failed + // clear the checkpoint info if failed + if (code != TSDB_CODE_SUCCESS) { taosThreadMutexLock(&pTask->lock); streamTaskClearCheckInfo(pTask, false); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); taosThreadMutexUnlock(&pTask->lock); streamTaskSetCheckpointFailedId(pTask); - stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, pTask->id.idStr, - ckId); + stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } double el = (taosGetTimestampMs() - startTs) / 1000.0; - stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", - pTask->id.idStr, pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el, + stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id, + pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el, (code == TSDB_CODE_SUCCESS) ? "succ" : "failed"); return code; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 68348f462f..ee2b70eebe 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1466,7 +1466,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false); continue; } @@ -1487,7 +1487,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { streamLaunchFillHistoryTask(pTask); } - streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -1497,10 +1497,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); code = ret; - streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaUpdateTaskDownstreamStatus(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); } } @@ -1519,7 +1519,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId); - streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false); + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } @@ -1538,10 +1538,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas if (ret != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); - streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->init, pInfo->start, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaUpdateTaskDownstreamStatus(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 24909a9776..84f1dbb4d7 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -26,6 +26,7 @@ typedef struct SLaunchHTaskInfo { SStreamMeta* pMeta; STaskId id; + STaskId hTaskId; } SLaunchHTaskInfo; typedef struct STaskRecheckInfo { @@ -43,7 +44,8 @@ typedef struct STaskInitTs { static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); -static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId, + int32_t hTaskId); static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); @@ -394,7 +396,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { int64_t initTs = pTask->execInfo.init; int64_t startTs = pTask->execInfo.start; - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); // start the related fill-history task, when current task is ready // not invoke in success callback due to the deadlock. @@ -492,12 +494,12 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t startTs = pTask->execInfo.init; int64_t now = taosGetTimestampMs(); - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); + streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); @@ -728,30 +730,79 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST); } +static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { + SStreamMeta* pMeta = pTask->pMeta; + SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); + + stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d", + pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref); + + pHTaskInfo->id.taskId = 0; + pHTaskInfo->id.streamId = 0; +} + +static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) { + SStreamMeta* pMeta = pTask->pMeta; + SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; + + if (streamTaskShouldStop(pTask)) { // record the failure + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64 ", ref:%d", pInfo->id.taskId, + pInfo->hTaskId.taskId, ref); + + streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); + taosMemoryFree(pInfo); + } else { + char* p = streamTaskGetStatus(pTask)->name; + int32_t hTaskId = pHTaskInfo->id.taskId; + + stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", + pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); + + taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer); + } +} + static void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; + int64_t now = taosGetTimestampMs(); streamMetaWLock(pMeta); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); - if (ppTask) { + if (ppTask == NULL || *ppTask == NULL) { + stError("s-task:0x%x and rel fill-history task:0x%" PRIx64 " all have been destroyed, not launch", + (int32_t)pInfo->id.taskId, pInfo->hTaskId.taskId); + streamMetaWUnLock(pMeta); + + // already dropped, no need to set the failure info into the stream task meta. + taosMemoryFree(pInfo); + return; + } + + if (streamTaskShouldStop(*ppTask)) { ASSERT((*ppTask)->status.timerActive >= 1); - if (streamTaskShouldStop(*ppTask)) { - char* p = streamTaskGetStatus(*ppTask)->name; - int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); - stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", - (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); + char* p = streamTaskGetStatus(*ppTask)->name; + int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); + stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", + (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); - taosMemoryFree(pInfo); - streamMetaWUnLock(pMeta); - return; - } + streamMetaWUnLock(pMeta); + + // record the related fill-history task failed + streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); + taosMemoryFree(pInfo); + return; } + + SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pInfo->id.streamId, pInfo->id.taskId); streamMetaWUnLock(pMeta); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); if (pTask != NULL) { SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; @@ -763,128 +814,153 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - streamMetaReleaseTask(pMeta, pTask); - - stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d", - pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref); - - pHTaskInfo->id.taskId = 0; - pHTaskInfo->id.streamId = 0; + noRetryLaunchFillHistoryTask(pTask, pInfo, now); } else { // not reach the limitation yet, let's continue retrying launch related fill-history task. streamTaskSetRetryInfoForLaunch(pHTaskInfo); ASSERT(pTask->status.timerActive >= 1); // abort the timer if intend to stop task SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId); - if (pHTask == NULL && (!streamTaskShouldStop(pTask))) { - char* p = streamTaskGetStatus(pTask)->name; - int32_t hTaskId = pHTaskInfo->id.taskId; - - stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", - pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); - - taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer); + if (pHTask == NULL) { + doRetryLaunchFillHistoryTask(pTask, pInfo, now); streamMetaReleaseTask(pMeta, pTask); return; - } - - if (pHTask != NULL) { + } else { checkFillhistoryTaskStatus(pTask, pHTask); streamMetaReleaseTask(pMeta, pHTask); - } - // not in timer anymore - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, - pHTaskInfo->retryTimes, ref); - streamMetaReleaseTask(pMeta, pTask); + // not in timer anymore + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, + pHTaskInfo->retryTimes, ref); + } } + + streamMetaReleaseTask(pMeta, pTask); } else { - stError("s-task:0x%x failed to load task, it may have been destroyed, not launch related fill-history task", - (int32_t)pInfo->id.taskId); + streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); + + int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); + stError("s-task:0x%x rel fill-history task:0x%" PRIx64 " may have been destroyed, not launch, ref:%d", + (int32_t)pInfo->id.taskId, pInfo->hTaskId.taskId, ref); } taosMemoryFree(pInfo); } -SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { +SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId, + int32_t hTaskId) { SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo)); if (pInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pInfo->id.taskId = taskId; pInfo->id.streamId = streamId; + pInfo->id.taskId = taskId; + + pInfo->hTaskId.streamId = hStreamId; + pInfo->hTaskId.taskId = hTaskId; + pInfo->pMeta = pMeta; return pInfo; } -// an fill history task needs to be started. -int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; - - int64_t streamId = pTask->hTaskInfo.id.streamId; - int32_t hTaskId = pTask->hTaskInfo.id.taskId; +static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + STaskExecStatisInfo* pExecInfo = &pTask->execInfo; + const char* idStr = pTask->id.idStr; + int64_t hStreamId = pTask->hTaskInfo.id.streamId; + int32_t hTaskId = pTask->hTaskInfo.id.taskId; ASSERT(hTaskId != 0); - SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - if (pStatus->state != TASK_STATUS__READY) { - STaskExecStatisInfo* pInfo = &pTask->execInfo; - stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", pTask->id.idStr, - pTask->hTaskInfo.id.streamId, hTaskId, pStatus->name); + stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId); - streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, hTaskId, pInfo->init, pInfo->start, false); - return -1;// todo set the correct error code - } else { - stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, - pTask->hTaskInfo.id.streamId, hTaskId); + SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId); + if (pInfo == NULL) { + stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr); + + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + return terrno; } - // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); - if (pHTask == NULL) { - stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", pTask->id.idStr, pMeta->vgId, - hTaskId); + // set the launch time info + streamTaskInitForLaunchHTask(&pTask->hTaskInfo); - SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); - if (pInfo == NULL) { - stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", pTask->id.idStr); + // check for the timer + if (pTask->hTaskInfo.pTimer == NULL) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer); + + if (pTask->hTaskInfo.pTimer == NULL) { + ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref); + + taosMemoryFree(pInfo); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); return terrno; } - streamTaskInitForLaunchHTask(&pTask->hTaskInfo); - if (pTask->hTaskInfo.pTimer == NULL) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer); - if (pTask->hTaskInfo.pTimer == NULL) { - atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr, - pTask->status.timerActive); - taosMemoryFree(pInfo); - } else { - ASSERT(ref >= 1); - stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref); - } - } else { // timer exists - ASSERT(pTask->status.timerActive >= 1); - stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); - taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer); - } + ASSERT(ref >= 1); - return TSDB_CODE_SUCCESS; - } - - if ((*pHTask)->status.downstreamReady == 1) { - stDebug("s-task:%s fill-history task is ready, no need to check downstream", (*pHTask)->id.idStr); - } else { - checkFillhistoryTaskStatus(pTask, *pHTask); + stDebug("s-task:%s set timer active flag, ref:%d", idStr, ref); + } else { // timer exists + ASSERT(pTask->status.timerActive >= 1); + stDebug("s-task:%s set timer active flag, task timer not null", idStr); + taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer); } return TSDB_CODE_SUCCESS; } +// an fill history task needs to be started. +int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + STaskExecStatisInfo* pExecInfo = &pTask->execInfo; + const char* idStr = pTask->id.idStr; + int64_t hStreamId = pTask->hTaskInfo.id.streamId; + int32_t hTaskId = pTask->hTaskInfo.id.taskId; + ASSERT(hTaskId != 0); + + // check stream task status in the first place. + SStreamTaskState* pStatus = streamTaskGetStatus(pTask); + if (pStatus->state != TASK_STATUS__READY) { + stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, + pStatus->name); + + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + return -1; // todo set the correct error code + } + + stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId); + + // Set the execute conditions, including the query time window and the version range + streamMetaRLock(pMeta); + SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); + streamMetaRUnLock(pMeta); + + if (pHTask != NULL) { // it is already added into stream meta store. + SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); + if (pHisTask == NULL) { + stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + } else { + if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing + stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, true); + } else { // exist, but not ready, continue check downstream task status + checkFillhistoryTaskStatus(pTask, pHisTask); + } + + streamMetaReleaseTask(pMeta, pHisTask); + } + + return TSDB_CODE_SUCCESS; + } else { + return launchNotBuiltFillHistoryTask(pTask); + } +} + int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { if (streamTaskGetStatus(pTask)->state == TASK_STATUS__DROPPING) { return 0; @@ -1114,7 +1190,7 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) } } -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, +int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready) { STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskId id = {.streamId = streamId, .taskId = taskId}; diff --git a/tests/system-test/8-stream/at_once_session.py b/tests/system-test/8-stream/at_once_session.py index 9a253a187f..3462561c4e 100644 --- a/tests/system-test/8-stream/at_once_session.py +++ b/tests/system-test/8-stream/at_once_session.py @@ -59,10 +59,14 @@ class TDTestCase: ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", cast(cast({partition_elm_alias} as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", cast(cast({partition_elm_alias} as bigint) as varchar(20))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None + time.sleep(1) # create stb/ctb/tb stream self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} partition by {partition} {partition_elm_alias} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="at_once", ignore_expired=ignore_expired, ignore_update=ignore_update, subtable_value=ctb_subtable_value, fill_history_value=fill_history_value) self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} partition by {partition} {partition_elm_alias} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="at_once", ignore_expired=ignore_expired, ignore_update=ignore_update, subtable_value=tb_subtable_value, fill_history_value=fill_history_value) + + time.sleep(1) + for i in range(self.tdCom.range_count): ctb_name = self.tdCom.get_long_name() self.tdCom.screate_ctable(stbname=self.stb_name, ctbname=ctb_name) diff --git a/tests/system-test/8-stream/max_delay_interval_ext.py b/tests/system-test/8-stream/max_delay_interval_ext.py index 653fcd997c..6536309a25 100644 --- a/tests/system-test/8-stream/max_delay_interval_ext.py +++ b/tests/system-test/8-stream/max_delay_interval_ext.py @@ -50,6 +50,8 @@ class TDTestCase: # create stb/ctb/tb stream self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ext_stb_stream_des_table, subtable_value=stb_subtable_value, source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value, fill_history_value=fill_history_value, stb_field_name_value=stb_field_name_value, tag_value=tag_value, use_exist_stb=use_exist_stb) + time.sleep(1) + init_num = 0 for i in range(self.tdCom.range_count): if i == 0: