commit
0eb94d97d3
|
@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
|
||||||
int32_t sz = taosArrayGetSize(pTasks);
|
int32_t sz = taosArrayGetSize(pTasks);
|
||||||
for (int32_t j = 0; j < sz; j++) {
|
for (int32_t j = 0; j < sz; j++) {
|
||||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||||
if (mndPauseStreamTask(pTrans, pTask) < 0) {
|
if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
|
||||||
int32_t sz = taosArrayGetSize(pTasks);
|
int32_t sz = taosArrayGetSize(pTasks);
|
||||||
for (int32_t j = 0; j < sz; j++) {
|
for (int32_t j = 0; j < sz; j++) {
|
||||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||||
if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
|
if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
||||||
|
|
||||||
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
int8_t status = atomic_load_8(&pTask->status.taskStatus);
|
int8_t status = atomic_load_8(&pTask->status.taskStatus);
|
||||||
if (status != TASK_STATUS__NORMAL) {
|
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
|
||||||
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
|
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
|
||||||
atomic_load_8(&pTask->status.taskStatus));
|
atomic_load_8(&pTask->status.taskStatus));
|
||||||
taosMsleep(2);
|
taosMsleep(2);
|
||||||
|
@ -408,7 +408,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
qDebug("s-task:%s exec completed", pTask->id.idStr);
|
qDebug("s-task:%s exec completed", pTask->id.idStr);
|
||||||
|
|
||||||
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) {
|
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue