diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1f5aa46f49..7cd8391c80 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -60,7 +60,6 @@ extern "C" { #define STREAM_EXEC_T_ADD_FAILED_TASK (-7) // the load and start stream task should be executed after snode has started successfully, since the load of stream // tasks may incur the download of checkpoint data from remote, which may consume significant network and CPU resources. -#define STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS (-8) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index dbda3a4541..544e820695 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -745,10 +745,6 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); return code; - } else if (type == STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS) { - streamMetaLoadAllTasks(pMeta); - int32_t code = streamMetaStartAllTasks(pMeta, tqExpandStreamTask); - return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 0ac10fe9fe..10bdccdb29 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -166,7 +166,7 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); int32_t streamTaskDownloadCheckpointData(const char* id, char* path); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); -int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); +int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask); typedef int32_t (*__stream_async_exec_fn_t)(void* param); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ec5cf2f4f6..5402c066a2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1376,13 +1376,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa return TSDB_CODE_SUCCESS; } - numOfTasks = taosArrayGetSize(pTaskList); - // broadcast the check downstream tasks msg + numOfTasks = taosArrayGetSize(pTaskList); for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - // todo: use hashTable instead SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); @@ -1391,13 +1389,14 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa } STaskExecStatisInfo* pInfo = &pTask->execInfo; - - code = expandFn(pTask); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:0x%x vgId:%d failed to build stream backend", pTaskId->taskId, vgId); - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); - streamMetaReleaseTask(pMeta, pTask); - continue; + if (pTask->pBackend == NULL) { + code = expandFn(pTask); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + streamMetaReleaseTask(pMeta, pTask); + continue; + } } // fill-history task can only be launched by related stream tasks. @@ -1407,6 +1406,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa continue; } + // ready now, start the related fill-history task if (pTask->status.downstreamReady == 1) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", @@ -1429,7 +1429,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa streamMetaReleaseTask(pMeta, pTask); } - stInfo("vgId:%d start tasks completed", pMeta->vgId); + stInfo("vgId:%d start all task(s) completed", pMeta->vgId); taosArrayDestroy(pTaskList); return code; } @@ -1494,7 +1494,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId); if (pTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId); + stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId); streamMetaAddFailedTask(pMeta, streamId, taskId); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } @@ -1507,9 +1507,29 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas } ASSERT(pTask->status.downstreamReady == 0); - - if (pTask->pBackend == NULL) { // todo handle the error code + if (pTask->pBackend == NULL) { int32_t code = expandFn(pTask); + if (code != TSDB_CODE_SUCCESS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + streamMetaReleaseTask(pMeta, pTask); + return code; + } + + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); + if (pHTask != NULL) { + if (pHTask->pBackend == NULL) { + code = expandFn(pHTask); + if (code != TSDB_CODE_SUCCESS) { + streamMetaAddFailedTaskSelf(pHTask, pInfo->readyTs); + streamMetaReleaseTask(pMeta, pHTask); + return code; + } + } + + streamMetaReleaseTask(pMeta, pHTask); + } + } } int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 6882f6617d..7a864a60d2 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -155,7 +155,7 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { +int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) { // set the state to be ready streamTaskSetReady(pTask); streamTaskSetRangeStreamCalc(pTask); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index cced6a6b84..82ea2f88ef 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -584,7 +584,7 @@ void doInitStateTransferTable(void) { // initialization event handle STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, NULL); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanHistoryTaskReady, NULL); taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event