clear call pipeline module

This commit is contained in:
liugq 2023-07-20 12:26:58 +08:00
parent a5e036d62a
commit f1cb35e57e
2 changed files with 32 additions and 41 deletions

View File

@ -66,9 +66,7 @@ func main() {
modules = append(modules, &elastic2.ElasticModule{}) modules = append(modules, &elastic2.ElasticModule{})
modules = append(modules, &queue2.DiskQueue{}) modules = append(modules, &queue2.DiskQueue{})
modules = append(modules, &redis.RedisModule{}) modules = append(modules, &redis.RedisModule{})
pipeM := &pipeline.PipeModule{} modules = append(modules, &pipeline.PipeModule{})
global.Register("pipeline_module", pipeM)
modules = append(modules, pipeM)
modules = append(modules, &task.TaskModule{}) modules = append(modules, &task.TaskModule{})
modules = append(modules, &metrics.MetricsModule{}) modules = append(modules, &metrics.MetricsModule{})
modules = append(modules, &security.Module{}) modules = append(modules, &security.Module{})

View File

@ -8,52 +8,45 @@ import (
"fmt" "fmt"
"infini.sh/console/model" "infini.sh/console/model"
"infini.sh/framework/core/global" "infini.sh/framework/core/global"
"infini.sh/framework/core/pipeline" "infini.sh/framework/core/util"
"infini.sh/framework/lib/go-ucfg/yaml" "os"
pipeline2 "infini.sh/framework/modules/pipeline" "path"
) )
func StartEmailServer(serv *model.EmailServer) error { func StartEmailServer(serv *model.EmailServer) error {
pipeCfgStr := GeneratePipelineConfig(serv) pipeCfgStr := GeneratePipelineConfig(serv)
cfg, err := yaml.NewConfig([]byte(pipeCfgStr)) //cfg, err := yaml.NewConfig([]byte(pipeCfgStr))
if err != nil { //if err != nil {
return fmt.Errorf("new config error: %w", err) // return fmt.Errorf("new config error: %w", err)
} //}
pipeCfg := pipeline.PipelineConfigV2{} cfgDir := global.Env().GetConfigDir()
err = cfg.Unpack(&pipeCfg) sendEmailCfgFile := path.Join(cfgDir, "send_email.yml")
if err != nil { _, err := util.FilePutContent(sendEmailCfgFile, pipeCfgStr)
return fmt.Errorf("unpack pipeline config error: %w", err) return err
} //pipeCfg := pipeline.PipelineConfigV2{}
v := global.Lookup("pipeline_module") //err = cfg.Unpack(&pipeCfg)
var ( //if err != nil {
pipeM *pipeline2.PipeModule // return fmt.Errorf("unpack pipeline config error: %w", err)
ok bool //}
) //v := global.Lookup("pipeline_module")
if pipeM, ok = v.(*pipeline2.PipeModule); !ok { //var (
return fmt.Errorf("can not find pipeline module") // pipeM *pipeline2.PipeModule
} // ok bool
err = pipeM.CreatePipeline(pipeCfg, true) //)
if err != nil { //if pipeM, ok = v.(*pipeline2.PipeModule); !ok {
return fmt.Errorf("create email server pipeline error: %w", err) // return fmt.Errorf("can not find pipeline module")
} //}
//err = pipeM.CreatePipeline(pipeCfg, true)
//if err != nil {
// return fmt.Errorf("create email server pipeline error: %w", err)
//}
return nil return nil
} }
func StopEmailServer(serv *model.EmailServer) error { func StopEmailServer(serv *model.EmailServer) error {
v := global.Lookup("pipeline_module") cfgDir := global.Env().GetConfigDir()
var ( sendEmailCfgFile := path.Join(cfgDir, "send_email.yml")
pipeM *pipeline2.PipeModule return os.RemoveAll(sendEmailCfgFile)
ok bool
)
if pipeM, ok = v.(*pipeline2.PipeModule); !ok {
return fmt.Errorf("can not find pipeline module")
}
emailServerTaskID := getEmailServerTaskID(serv)
exists := pipeM.StopTask(emailServerTaskID)
if exists {
pipeM.DeleteTask(emailServerTaskID)
}
return nil
} }
func getEmailServerTaskID(serv *model.EmailServer) string { func getEmailServerTaskID(serv *model.EmailServer) string {