fix(stream): init backend for fill-history task.

This commit is contained in:
Haojun Liao 2024-05-15 00:08:38 +08:00
parent 4eacf86843
commit 610aa80e65
6 changed files with 37 additions and 22 deletions

View File

@ -60,7 +60,6 @@ extern "C" {
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
// the load and start stream task should be executed after snode has started successfully, since the load of stream
// tasks may incur the download of checkpoint data from remote, which may consume significant network and CPU resources.
#define STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS (-8)
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;

View File

@ -745,10 +745,6 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
return code;
} else if (type == STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS) {
streamMetaLoadAllTasks(pMeta);
int32_t code = streamMetaStartAllTasks(pMeta, tqExpandStreamTask);
return code;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);

View File

@ -166,7 +166,7 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskDownloadCheckpointData(const char* id, char* path);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask);
typedef int32_t (*__stream_async_exec_fn_t)(void* param);

View File

@ -1376,13 +1376,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
return TSDB_CODE_SUCCESS;
}
numOfTasks = taosArrayGetSize(pTaskList);
// broadcast the check downstream tasks msg
numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
// todo: use hashTable instead
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
@ -1391,13 +1389,14 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
}
STaskExecStatisInfo* pInfo = &pTask->execInfo;
code = expandFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:0x%x vgId:%d failed to build stream backend", pTaskId->taskId, vgId);
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
streamMetaReleaseTask(pMeta, pTask);
continue;
if (pTask->pBackend == NULL) {
code = expandFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
}
// fill-history task can only be launched by related stream tasks.
@ -1407,6 +1406,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
continue;
}
// ready now, start the related fill-history task
if (pTask->status.downstreamReady == 1) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
@ -1429,7 +1429,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
streamMetaReleaseTask(pMeta, pTask);
}
stInfo("vgId:%d start tasks completed", pMeta->vgId);
stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
taosArrayDestroy(pTaskList);
return code;
}
@ -1494,7 +1494,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId);
stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId);
streamMetaAddFailedTask(pMeta, streamId, taskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
@ -1507,9 +1507,29 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
}
ASSERT(pTask->status.downstreamReady == 0);
if (pTask->pBackend == NULL) { // todo handle the error code
if (pTask->pBackend == NULL) {
int32_t code = expandFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHTask != NULL) {
if (pHTask->pBackend == NULL) {
code = expandFn(pHTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pHTask, pInfo->readyTs);
streamMetaReleaseTask(pMeta, pHTask);
return code;
}
}
streamMetaReleaseTask(pMeta, pHTask);
}
}
}
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);

View File

@ -155,7 +155,7 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) {
// set the state to be ready
streamTaskSetReady(pTask);
streamTaskSetRangeStreamCalc(pTask);

View File

@ -584,7 +584,7 @@ void doInitStateTransferTable(void) {
// initialization event handle
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, NULL);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanHistoryTaskReady, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event