[migration] check finished pipeline logs by updated time
This commit is contained in:
parent
99cf8b769f
commit
d1e947f826
|
@ -317,9 +317,19 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, errs []string) {
|
func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, errs []string) {
|
||||||
|
newHits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get latest pipeline logs for task [%s], err: %v", taskItem.ID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(newHits) == 0 {
|
||||||
|
log.Debugf("task [%s] not finished yet since last start", taskItem.ID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0)
|
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err)
|
log.Errorf("failed to get all pipeline logs for task [%s], err: %v", taskItem.ID, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -356,6 +366,7 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i
|
||||||
|
|
||||||
func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]elastic.IndexDocument, error) {
|
func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]elastic.IndexDocument, error) {
|
||||||
query := util.MapStr{
|
query := util.MapStr{
|
||||||
|
"size": 999,
|
||||||
"sort": []util.MapStr{
|
"sort": []util.MapStr{
|
||||||
{
|
{
|
||||||
"timestamp": util.MapStr{
|
"timestamp": util.MapStr{
|
||||||
|
@ -368,9 +379,6 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timest
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"collapse": util.MapStr{
|
|
||||||
"field": "metadata.labels.task_id",
|
|
||||||
},
|
|
||||||
"query": util.MapStr{
|
"query": util.MapStr{
|
||||||
"bool": util.MapStr{
|
"bool": util.MapStr{
|
||||||
"must": []util.MapStr{
|
"must": []util.MapStr{
|
||||||
|
|
Loading…
Reference in New Issue