fix(stream): reset update stream tasks.
This commit is contained in:
parent
082882b71e
commit
873c22abb1
|
@ -80,6 +80,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||||
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId);
|
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId);
|
||||||
ASSERT(pTask->exec.pExecutor);
|
ASSERT(pTask->exec.pExecutor);
|
||||||
|
|
||||||
|
streamTaskResetUpstreamStageInfo(pTask);
|
||||||
streamSetupScheduleTrigger(pTask);
|
streamSetupScheduleTrigger(pTask);
|
||||||
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
|
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
|
||||||
pTask->id.idStr, pTask->chkInfo.checkpointVer, pTask->info.selfChildId, pTask->info.taskLevel);
|
pTask->id.idStr, pTask->chkInfo.checkpointVer, pTask->info.selfChildId, pTask->info.taskLevel);
|
||||||
|
|
|
@ -1033,6 +1033,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->status.taskStatus = TASK_STATUS__NORMAL;
|
pTask->status.taskStatus = TASK_STATUS__NORMAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamTaskResetUpstreamStageInfo(pTask);
|
||||||
streamSetupScheduleTrigger(pTask);
|
streamSetupScheduleTrigger(pTask);
|
||||||
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue