fix(stream): add missing pushed data.
This commit is contained in:
parent
9d2256ec9c
commit
d7fb9c5b6e
|
@ -168,7 +168,6 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
|
||||||
} else if (level == TASK_LEVEL__AGG) {
|
} else if (level == TASK_LEVEL__AGG) {
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
streamSetParamForScanHistory(pTask);
|
streamSetParamForScanHistory(pTask);
|
||||||
streamTaskEnablePause(pTask);
|
|
||||||
}
|
}
|
||||||
} else if (level == TASK_LEVEL__SINK) {
|
} else if (level == TASK_LEVEL__SINK) {
|
||||||
stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
|
stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
|
||||||
|
@ -345,7 +344,6 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus);
|
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskEnablePause(pTask);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -659,9 +657,6 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
|
||||||
streamMetaCommit(pMeta);
|
streamMetaCommit(pMeta);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
// history data scan in the stream time window finished, now let's enable the pause
|
|
||||||
streamTaskEnablePause(pTask);
|
|
||||||
|
|
||||||
// for source tasks, let's continue execute.
|
// for source tasks, let's continue execute.
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
|
@ -1040,11 +1035,6 @@ void streamTaskResume(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskEnablePause(SStreamTask* pTask) {
|
|
||||||
stDebug("s-task:%s enable task pause", pTask->id.idStr);
|
|
||||||
pTask->status.pauseAllowed = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
|
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
|
|
Loading…
Reference in New Issue