diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0d1e62cdbe..90cb06ff42 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -641,12 +641,13 @@ bool streamTaskShouldPause(const SStreamTask* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus); -int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId); -SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask); -const char* streamTaskGetStatusStr(ETaskStatus status); -void streamTaskResetStatus(SStreamTask* pTask); -void streamTaskSetStatusReady(SStreamTask* pTask); -ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask); +int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId); +SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask); +const char* streamTaskGetStatusStr(ETaskStatus status); +void streamTaskResetStatus(SStreamTask* pTask); +void streamTaskSetStatusReady(SStreamTask* pTask); +ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask); +const char* streamTaskGetExecType(int32_t type); bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f817447099..f7c61b48e3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -355,43 +355,15 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock return code; } -/** - * All down stream tasks have successfully completed the check point task. - * Current stream task is allowed to start to do checkpoint things in ASYNC model. - */ -int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId, - int32_t downstreamTaskId) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); - SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - - const char* id = pTask->id.idStr; - bool received = false; - int32_t total = streamTaskGetNumOfDownstream(pTask); - ASSERT(total > 0); - - // 1. not in checkpoint status now - SStreamTaskState pStat = streamTaskGetStatus(pTask); - if (pStat.state != TASK_STATUS__CK) { - stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } - - // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg - if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) { - stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64 - ") from task:0x%x, expired and discard ", - id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId); - return -1; - } - - streamMutexLock(&pInfo->lock); - - // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task +// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task +static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream, + int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId, + const char* id, int32_t* pNotReady, int32_t* pTransId) { + bool received = false; int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList); for (int32_t i = 0; i < size; ++i) { STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); if (p == NULL) { - streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -403,27 +375,69 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId if (received) { stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id, - downstreamTaskId, (int32_t)(total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), total); + downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), + numOfDownstream); } else { STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(), .downstreamTaskId = downstreamTaskId, .checkpointId = pInfo->activeId, .transId = pInfo->transId, - .streamId = pTask->id.streamId, + .streamId = streamId, .downstreamNodeId = downstreamNodeId}; - (void)taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); + void* p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); + if (p == NULL) { + stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno)); + return terrno; + } } - int32_t notReady = total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList); - int32_t transId = pInfo->transId; + *pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList); + *pTransId = pInfo->transId; + return 0; +} + +/** + * All down stream tasks have successfully completed the check point task. + * Current stream task is allowed to start to do checkpoint things in ASYNC model. + */ +int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId, + int32_t downstreamTaskId) { + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + + const char* id = pTask->id.idStr; + int32_t total = streamTaskGetNumOfDownstream(pTask); + int32_t code = 0; + int32_t notReady = 0; + int32_t transId = 0; + + ASSERT(total > 0 && (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG)); + + // 1. not in checkpoint status now + SStreamTaskState pStat = streamTaskGetStatus(pTask); + if (pStat.state != TASK_STATUS__CK) { + stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + + // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg + if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) { + stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64 + ") from task:0x%x, expired and discard", + id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId); + return TSDB_CODE_INVALID_MSG; + } + + streamMutexLock(&pInfo->lock); + code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, ¬Ready, + &transId); streamMutexUnlock(&pInfo->lock); - if (notReady == 0) { + if ((notReady == 0) && (code == 0)) { stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); } - return 0; + return code; } int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) { @@ -1034,8 +1048,7 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); if (p == NULL) { - streamMutexUnlock(&pInfo->lock); - return num; + continue; } if (p->recved) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 08cf490b94..8ecd62d1eb 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1464,14 +1464,16 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { } int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - int32_t code = 0; - int32_t vgId = pMeta->vgId; + int32_t code = 0; + int32_t vgId = pMeta->vgId; + SStreamTask* pTask = NULL; + bool continueExec = true; + stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); - SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); if (pTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId); + stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId); (void)streamMetaAddFailedTask(pMeta, streamId, taskId); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } @@ -1479,10 +1481,26 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas // fill-history task can only be launched by related stream tasks. STaskExecStatisInfo* pInfo = &pTask->execInfo; if (pTask->info.fillHistory == 1) { + stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } + streamMutexLock(&pTask->lock); + SStreamTaskState status = streamTaskGetStatus(pTask); + if (status.state != TASK_STATUS__UNINIT) { + stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name); + continueExec = false; + } else { + continueExec = true; + } + streamMutexUnlock(&pTask->lock); + + if (!continueExec) { + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + ASSERT(pTask->status.downstreamReady == 0); // avoid initialization and destroy running concurrently. @@ -1501,7 +1519,8 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas if (code == TSDB_CODE_SUCCESS) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT); + stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT, + tstrerror(code)); streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index f9fcf36668..a83a0e4cc8 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -48,14 +48,15 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType, - terrstr()); - return TSDB_CODE_OUT_OF_MEMORY; + terrstr(terrno)); + return terrno; } if (streamId != 0) { - stDebug("vgId:%d create msg to start stream task:0x%x, exec type:%d", vgId, taskId, execType); + stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType, + streamTaskGetExecType(execType)); } else { - stDebug("vgId:%d create msg to exec, type:%d", vgId, execType); + stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType)); } pRunReq->head.vgId = vgId; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a249bad724..f07fd81953 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1149,4 +1149,25 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pCheckpointReadyRecvList); +} + +const char* streamTaskGetExecType(int32_t type) { + switch (type) { + case STREAM_EXEC_T_EXTRACT_WAL_DATA: + return "scan-wal-file"; + case STREAM_EXEC_T_START_ALL_TASKS: + return "start-all-tasks"; + case STREAM_EXEC_T_START_ONE_TASK: + return "start-one-task"; + case STREAM_EXEC_T_RESTART_ALL_TASKS: + return "restart-all-tasks"; + case STREAM_EXEC_T_STOP_ALL_TASKS: + return "stop-all-tasks"; + case STREAM_EXEC_T_RESUME_TASK: + return "resume-task-from-idle"; + case STREAM_EXEC_T_ADD_FAILED_TASK: + return "record-start-failed-task"; + default: + return "invalid-exec-type"; + } } \ No newline at end of file diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 275c9255d2..0779eede9f 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -322,12 +322,11 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt if (pTrans->attachEvent.event != 0) { code = attachWaitedEvent(pTask, &pTrans->attachEvent); + streamMutexUnlock(&pTask->lock); if (code) { return code; } - streamMutexUnlock(&pTask->lock); - while (1) { // wait for the task to be here streamMutexLock(&pTask->lock); @@ -557,6 +556,11 @@ ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask) { } const char* streamTaskGetStatusStr(ETaskStatus status) { + int32_t index = status; + if (index < 0 || index > tListLen(StreamTaskStatusList)) { + return ""; + } + return StreamTaskStatusList[status].name; }