diff --git a/modules/agent/api/instance.go b/modules/agent/api/instance.go index eab6e74f..49ae9922 100644 --- a/modules/agent/api/instance.go +++ b/modules/agent/api/instance.go @@ -725,6 +725,10 @@ func getAgentTaskSetting(agentID string, node agent.ESNodeInfo) (*agent.Setting, if err != nil { return nil, err } + taskSetting.Logs = &model.LogsTask{ + Enabled:true, + LogsPath: node.Path.Logs, + } return &agent.Setting{ Metadata: agent.SettingsMetadata{ Category: "agent", diff --git a/modules/agent/common/helper.go b/modules/agent/common/helper.go index cc7ea55d..43694888 100644 --- a/modules/agent/common/helper.go +++ b/modules/agent/common/helper.go @@ -220,9 +220,35 @@ func TransformSettingsToConfig(setting *model.TaskSetting, clusterID, nodeUUID s toDeletePipelineNames = append(toDeletePipelineNames, getMetricPipelineName(nodeUUID, processorName)) } } + if setting.Logs != nil { + var processorName = "es_logs_processor" + if setting.Logs.Enabled { + params := util.MapStr{ + "elasticsearch": clusterID, + "queue_name": "logs", + } + if setting.Logs.LogsPath != "" { + params["logs_path"] = setting.Logs.LogsPath + } + cfg := util.MapStr{ + processorName: params, + } + enabled := true + pipelineCfg := util.MapStr{ + "enabled": &enabled, + "name": fmt.Sprintf("collect_%s_es_logs", nodeUUID), + "auto_start": true, + "keep_running": true, + "retry_delay_in_ms": 3000, + "processor": []util.MapStr{cfg}, + } + pipelines = append(pipelines, pipelineCfg) + } + } return pipelines, toDeletePipelineNames, nil } + func newClusterMetricPipeline(processorName string, clusterID string)(util.MapStr, error){ cfg := util.MapStr{ processorName: util.MapStr{ diff --git a/modules/agent/model/task.go b/modules/agent/model/task.go index d8b42006..eed98ee4 100644 --- a/modules/agent/model/task.go +++ b/modules/agent/model/task.go @@ -36,6 +36,7 @@ type NodeStatsTask struct { type LogsTask struct { Enabled bool `json:"enabled"` + LogsPath string `json:"logs_path"` } type ParseAgentSettingsResult struct {