diff --git a/server/internal/v2/message.go b/server/internal/v2/message.go index 325343b..128316d 100644 --- a/server/internal/v2/message.go +++ b/server/internal/v2/message.go @@ -2,13 +2,22 @@ package server import ( "github.com/kercylan98/minotaur/server/internal/v2/queue" + "github.com/kercylan98/minotaur/server/internal/v2/reactor" "github.com/kercylan98/minotaur/utils/log/v2" "github.com/kercylan98/minotaur/utils/super" "runtime/debug" ) +type MessageI interface { + // OnInitialize 消息初始化阶段将会被告知消息所在服务器、反应器、队列及标识信息 + OnInitialize(srv Server, reactor *reactor.Reactor[Message], queue *queue.Queue[int, string, Message], ident string) + + // OnProcess 消息处理阶段需要完成对消息的处理,并返回处理结果 + OnProcess(finish func(err error)) +} + type Message interface { - Execute() + OnExecute() } func SyncMessage(srv *server, handler func(srv *server)) Message { @@ -20,7 +29,7 @@ type syncMessage struct { handler func(srv *server) } -func (s *syncMessage) Execute() { +func (s *syncMessage) OnExecute() { s.handler(s.srv) } @@ -40,7 +49,7 @@ type asyncMessage struct { callback func(srv *server, err error) } -func (s *asyncMessage) Execute() { +func (s *asyncMessage) OnExecute() { var q *queue.Queue[int, string, Message] var dispatch = func(ident string, message Message, beforeHandler ...func(queue *queue.Queue[int, string, Message], msg Message)) { _ = s.srv.reactor.AutoDispatch(ident, message, beforeHandler...) diff --git a/server/internal/v2/messages/async.go b/server/internal/v2/messages/async.go new file mode 100644 index 0000000..c168655 --- /dev/null +++ b/server/internal/v2/messages/async.go @@ -0,0 +1,27 @@ +package messages + +import ( + "github.com/kercylan98/minotaur/server/internal/v2" + "github.com/kercylan98/minotaur/server/internal/v2/queue" + "github.com/kercylan98/minotaur/server/internal/v2/reactor" + "github.com/kercylan98/minotaur/utils/super" +) + +func NewAsync(handler func() error, callback func(err error)) server.MessageI { + return &Async{handler: handler} +} + +type Async struct { + handler func() error + callback func(err error) +} + +func (s *Async) OnInitialize(srv server.Server, reactor *reactor.Reactor[server.Message], queue *queue.Queue[int, string, server.Message], ident string) { + +} + +func (s *Async) OnProcess(finish func(err error)) { + defer finish(super.RecoverTransform(recover())) + + s.handler() +} diff --git a/server/internal/v2/messages/sync.go b/server/internal/v2/messages/sync.go new file mode 100644 index 0000000..dac04dc --- /dev/null +++ b/server/internal/v2/messages/sync.go @@ -0,0 +1,26 @@ +package messages + +import ( + "github.com/kercylan98/minotaur/server/internal/v2" + "github.com/kercylan98/minotaur/server/internal/v2/queue" + "github.com/kercylan98/minotaur/server/internal/v2/reactor" + "github.com/kercylan98/minotaur/utils/super" +) + +func NewSync(handler func()) server.MessageI { + return &Sync{handler: handler} +} + +type Sync struct { + handler func() +} + +func (s *Sync) OnInitialize(srv server.Server, reactor *reactor.Reactor[server.Message], queue *queue.Queue[int, string, server.Message], ident string) { + +} + +func (s *Sync) OnProcess(finish func(err error)) { + defer finish(super.RecoverTransform(recover())) + + s.handler() +} diff --git a/server/internal/v2/options.go b/server/internal/v2/options.go index c214417..dfc8171 100644 --- a/server/internal/v2/options.go +++ b/server/internal/v2/options.go @@ -2,6 +2,7 @@ package server import ( "github.com/kercylan98/minotaur/utils/log/v2" + "os" "sync" "time" ) @@ -18,7 +19,7 @@ func DefaultOptions() *Options { actorMessageBufferInitialSize: 1024, messageErrorHandler: nil, lifeCycleLimit: 0, - logger: log.GetLogger(), + logger: log.NewLogger(log.NewHandler(os.Stdout, log.DefaultOptions().WithCallerSkip(-1).WithLevel(log.LevelInfo))), } } @@ -33,6 +34,8 @@ type Options struct { lifeCycleLimit time.Duration // 服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器 logger *log.Logger // 日志记录器 debug bool // Debug 模式 + syncLowMessageDuration time.Duration // 同步慢消息时间 + asyncLowMessageDuration time.Duration // 异步慢消息时间 } func (opt *Options) init(srv *server) *Options { @@ -53,6 +56,9 @@ func (opt *Options) Apply(options ...*Options) { opt.messageErrorHandler = option.messageErrorHandler opt.lifeCycleLimit = option.lifeCycleLimit opt.logger = option.logger + opt.debug = option.debug + opt.syncLowMessageDuration = option.syncLowMessageDuration + opt.asyncLowMessageDuration = option.asyncLowMessageDuration option.rw.RUnlock() } @@ -65,6 +71,32 @@ func (opt *Options) active() { opt.server.notify.lifeCycleTime <- opt.GetLifeCycleLimit() } +// WithSyncLowMessageMonitor 设置同步消息的慢消息监测时间 +func (opt *Options) WithSyncLowMessageMonitor(duration time.Duration) *Options { + return opt.modifyOptionsValue(func(opt *Options) { + opt.syncLowMessageDuration = duration + }) +} + +func (opt *Options) GetSyncLowMessageDuration() time.Duration { + return getOptionsValue(opt, func(opt *Options) time.Duration { + return opt.syncLowMessageDuration + }) +} + +// WithAsyncLowMessageMonitor 设置异步消息的慢消息监测时间 +func (opt *Options) WithAsyncLowMessageMonitor(duration time.Duration) *Options { + return opt.modifyOptionsValue(func(opt *Options) { + opt.asyncLowMessageDuration = duration + }) +} + +func (opt *Options) GetAsyncLowMessageDuration() time.Duration { + return getOptionsValue(opt, func(opt *Options) time.Duration { + return opt.asyncLowMessageDuration + }) +} + // WithDebug 设置 Debug 模式是否开启 // - 该函数支持运行时设置 func (opt *Options) WithDebug(debug bool) *Options { diff --git a/server/internal/v2/server.go b/server/internal/v2/server.go index 9926f76..3f5066d 100644 --- a/server/internal/v2/server.go +++ b/server/internal/v2/server.go @@ -57,7 +57,7 @@ func NewServer(network Network, options ...*Options) Server { srv.GetServerMessageChannelSize(), srv.GetActorMessageChannelSize(), srv.GetServerMessageBufferInitialSize(), srv.GetActorMessageBufferInitialSize(), func(message queue.MessageWrapper[int, string, Message]) { - message.Message().Execute() + message.Message().OnExecute() }, func(message queue.MessageWrapper[int, string, Message], err error) { if handler := srv.GetMessageErrorHandler(); handler != nil { handler(srv, message.Message(), err)