fix(stream): remove invalid assert and add check for task status before launching related fill-history task.
This commit is contained in:
parent
fa9df2a495
commit
33f698e926
|
@ -1160,7 +1160,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
|||
} else if (status == TASK_STATUS__UNINIT) {
|
||||
// todo: fill-history task init ?
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
EStreamTaskEvent event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
|
||||
EStreamTaskEvent event = TASK_EVENT_INIT;
|
||||
streamTaskHandleEvent(pTask->status.pSM, event);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -388,12 +388,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
||||
EStreamTaskEvent event;
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
|
||||
} else {
|
||||
event = TASK_EVENT_INIT_SCANHIST;
|
||||
}
|
||||
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
||||
|
||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
|
||||
|
||||
|
@ -820,11 +815,23 @@ SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, in
|
|||
// an fill history task needs to be started.
|
||||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
|
||||
|
||||
ASSERT((hTaskId != 0) && (pTask->status.downstreamReady == 1));
|
||||
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
|
||||
pTask->hTaskInfo.id.streamId, hTaskId);
|
||||
int64_t streamId = pTask->hTaskInfo.id.streamId;
|
||||
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
|
||||
ASSERT(hTaskId != 0);
|
||||
|
||||
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
|
||||
if (pStatus->state != TASK_STATUS__READY) {
|
||||
STaskExecStatisInfo* pInfo = &pTask->execInfo;
|
||||
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", pTask->id.idStr,
|
||||
pTask->hTaskInfo.id.streamId, hTaskId);
|
||||
|
||||
streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, hTaskId, pInfo->init, pInfo->start, false);
|
||||
return -1;// todo set the correct error code
|
||||
} else {
|
||||
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
|
||||
pTask->hTaskInfo.id.streamId, hTaskId);
|
||||
}
|
||||
|
||||
// Set the execute conditions, including the query time window and the version range
|
||||
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
|
||||
|
|
|
@ -38,7 +38,6 @@ SStreamEventInfo StreamTaskEventList[12] = {
|
|||
{.event = 0, .name = ""}, // dummy event, place holder
|
||||
{.event = TASK_EVENT_INIT, .name = "initialize"},
|
||||
{.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-init"},
|
||||
// {.event = TASK_EVENT_INIT_STREAM_SCANHIST, .name = "stream-scan-history-init"},
|
||||
{.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"},
|
||||
{.event = TASK_EVENT_STOP, .name = "stopping"},
|
||||
{.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"},
|
||||
|
|
Loading…
Reference in New Issue