fix(stream): pause not stop the start procedure of stream tasks.

This commit is contained in:
Haojun Liao 2024-06-24 13:25:48 +08:00
parent 393369fcd8
commit bd8c7d8c93
2 changed files with 3 additions and 27 deletions

View File

@ -998,18 +998,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
ASSERT (status != TASK_STATUS__UNINIT); /*{
// tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr);
//
// if (pTask->pBackend == NULL) { // TODO: add test cases for this
// int32_t code = pMeta->expandTaskFn(pTask);
// if (code != TSDB_CODE_SUCCESS) {
// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId);
// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
// }
// }
// int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
}*/
ASSERT (status != TASK_STATUS__UNINIT);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
@ -1036,20 +1025,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
streamTrySchedExec(pTask);
}
} else {
ASSERT (status != TASK_STATUS__UNINIT);// { // todo: fill-history task init ?
// if (pTask->info.fillHistory == 0) {
// tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr);
//
// if (pTask->pBackend == NULL) { // TODO: add test cases for this
// int32_t code = pMeta->expandTaskFn(pTask);
// if (code != TSDB_CODE_SUCCESS) {
// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId);
// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
// }
// }
// int32_t ret = */streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
// }
// }
ASSERT (status != TASK_STATUS__UNINIT);
}
streamMetaReleaseTask(pMeta, pTask);

View File

@ -204,7 +204,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
// check stream task status in the first place.
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) {
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT && pStatus->state != TASK_STATUS__PAUSE) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus->name);