From 6721ec3cd83964cb820a4f49cceb26ed2e436610 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 May 2024 14:06:25 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamCheckStatus.c | 59 ++++++++++++++++++--- source/libs/stream/src/streamMeta.c | 34 ++++++------ source/libs/stream/src/streamStartHistory.c | 50 ----------------- 4 files changed, 68 insertions(+), 76 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index fe6a7de734..3c5d6d6e4c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -707,6 +707,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int64_t endTs, bool ready); int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs); void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, int64_t startTs); void streamMetaRLock(SStreamMeta* pMeta); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 05cc67e069..4a8ca69ba5 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -38,6 +38,56 @@ static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfR static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId); static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); +int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, + int64_t* oldStage) { + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); + ASSERT(pInfo != NULL); + + *oldStage = pInfo->stage; + const char* id = pTask->id.idStr; + if (stage == -1) { + stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id, + upstreamTaskId, vgId, stage); + return 0; + } + + if (pInfo->stage == -1) { + pInfo->stage = stage; + stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id, + upstreamTaskId, vgId, stage); + } + + if (pInfo->stage < stage) { + stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 + ", prev:%" PRId64, + id, upstreamTaskId, vgId, stage, pInfo->stage); + // record the checkpoint failure id and sent to mnode + taosThreadMutexLock(&pTask->lock); + ETaskStatus status = streamTaskGetStatus(pTask)->state; + if (status == TASK_STATUS__CK) { + streamTaskSetFailedCheckpointId(pTask); + } + taosThreadMutexUnlock(&pTask->lock); + } + + if (pInfo->stage != stage) { + + taosThreadMutexLock(&pTask->lock); + ETaskStatus status = streamTaskGetStatus(pTask)->state; + if (status == TASK_STATUS__CK) { + streamTaskSetFailedCheckpointId(pTask); + } + taosThreadMutexUnlock(&pTask->lock); + + return TASK_UPSTREAM_NEW_STAGE; + } else if (pTask->status.downstreamReady != 1) { + stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER)); + return TASK_DOWNSTREAM_NOT_READY; + } else { + return TASK_DOWNSTREAM_READY; + } +} + // check status void streamTaskSendCheckMsg(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; @@ -184,14 +234,7 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } - int32_t startTs = pTask->execInfo.checkTs; - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); - - // automatically set the related fill-history task to be failed. - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); - } + streamMetaAddFailedTaskSelf(pTask, now); } else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms ASSERT(left > 0); stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index becc692a07..4fa9b2c66f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1381,12 +1381,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - // todo: may be we should find the related fill-history task and set it failed. // todo: use hashTable instead SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); - streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, 0, now, false); + streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); continue; } @@ -1414,12 +1413,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { if (ret != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); code = ret; - - streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false); - } + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } streamMetaReleaseTask(pMeta, pTask); @@ -1491,12 +1485,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId); - streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false); + streamMetaAddFailedTask(pMeta, streamId, taskId); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } - // todo: may be we should find the related fill-history task and set it failed. - // fill-history task can only be launched by related stream tasks. STaskExecStatisInfo* pInfo = &pTask->execInfo; if (pTask->info.fillHistory == 1) { @@ -1508,13 +1500,8 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); if (ret != TSDB_CODE_SUCCESS) { - stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); - - streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->checkTs, pInfo->readyTs, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false); - } + stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT); + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } streamMetaReleaseTask(pMeta, pTask); @@ -1652,6 +1639,17 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta return code; } +void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { + int32_t startTs = pTask->execInfo.checkTs; + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); + + // automatically set the related fill-history task to be failed. + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pId = &pTask->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); + } +} + void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, int64_t startTs) { const char* id = pTask->id.idStr; diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index fb5b5e57d4..6882f6617d 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -130,56 +130,6 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) { return 0; } -int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, - int64_t* oldStage) { - SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); - ASSERT(pInfo != NULL); - - *oldStage = pInfo->stage; - const char* id = pTask->id.idStr; - if (stage == -1) { - stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id, - upstreamTaskId, vgId, stage); - return 0; - } - - if (pInfo->stage == -1) { - pInfo->stage = stage; - stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id, - upstreamTaskId, vgId, stage); - } - - if (pInfo->stage < stage) { - stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 - ", prev:%" PRId64, - id, upstreamTaskId, vgId, stage, pInfo->stage); - // record the checkpoint failure id and sent to mnode - taosThreadMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask)->state; - if (status == TASK_STATUS__CK) { - streamTaskSetFailedCheckpointId(pTask); - } - taosThreadMutexUnlock(&pTask->lock); - } - - if (pInfo->stage != stage) { - - taosThreadMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask)->state; - if (status == TASK_STATUS__CK) { - streamTaskSetFailedCheckpointId(pTask); - } - taosThreadMutexUnlock(&pTask->lock); - - return TASK_UPSTREAM_NEW_STAGE; - } else if (pTask->status.downstreamReady != 1) { - stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER)); - return TASK_DOWNSTREAM_NOT_READY; - } else { - return TASK_DOWNSTREAM_READY; - } -} - int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) { const char* id = pTask->id.idStr;