other: 服务器消息优化前

This commit is contained in:
kercylan98 2024-04-07 14:37:56 +08:00
parent ac929b6fcd
commit 16704bfbb6
5 changed files with 99 additions and 5 deletions

View File

@ -2,13 +2,22 @@ package server
import ( import (
"github.com/kercylan98/minotaur/server/internal/v2/queue" "github.com/kercylan98/minotaur/server/internal/v2/queue"
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
"github.com/kercylan98/minotaur/utils/log/v2" "github.com/kercylan98/minotaur/utils/log/v2"
"github.com/kercylan98/minotaur/utils/super" "github.com/kercylan98/minotaur/utils/super"
"runtime/debug" "runtime/debug"
) )
type MessageI interface {
// OnInitialize 消息初始化阶段将会被告知消息所在服务器、反应器、队列及标识信息
OnInitialize(srv Server, reactor *reactor.Reactor[Message], queue *queue.Queue[int, string, Message], ident string)
// OnProcess 消息处理阶段需要完成对消息的处理,并返回处理结果
OnProcess(finish func(err error))
}
type Message interface { type Message interface {
Execute() OnExecute()
} }
func SyncMessage(srv *server, handler func(srv *server)) Message { func SyncMessage(srv *server, handler func(srv *server)) Message {
@ -20,7 +29,7 @@ type syncMessage struct {
handler func(srv *server) handler func(srv *server)
} }
func (s *syncMessage) Execute() { func (s *syncMessage) OnExecute() {
s.handler(s.srv) s.handler(s.srv)
} }
@ -40,7 +49,7 @@ type asyncMessage struct {
callback func(srv *server, err error) callback func(srv *server, err error)
} }
func (s *asyncMessage) Execute() { func (s *asyncMessage) OnExecute() {
var q *queue.Queue[int, string, Message] var q *queue.Queue[int, string, Message]
var dispatch = func(ident string, message Message, beforeHandler ...func(queue *queue.Queue[int, string, Message], msg Message)) { var dispatch = func(ident string, message Message, beforeHandler ...func(queue *queue.Queue[int, string, Message], msg Message)) {
_ = s.srv.reactor.AutoDispatch(ident, message, beforeHandler...) _ = s.srv.reactor.AutoDispatch(ident, message, beforeHandler...)

View File

@ -0,0 +1,27 @@
package messages
import (
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/server/internal/v2/queue"
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
"github.com/kercylan98/minotaur/utils/super"
)
func NewAsync(handler func() error, callback func(err error)) server.MessageI {
return &Async{handler: handler}
}
type Async struct {
handler func() error
callback func(err error)
}
func (s *Async) OnInitialize(srv server.Server, reactor *reactor.Reactor[server.Message], queue *queue.Queue[int, string, server.Message], ident string) {
}
func (s *Async) OnProcess(finish func(err error)) {
defer finish(super.RecoverTransform(recover()))
s.handler()
}

View File

@ -0,0 +1,26 @@
package messages
import (
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/server/internal/v2/queue"
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
"github.com/kercylan98/minotaur/utils/super"
)
func NewSync(handler func()) server.MessageI {
return &Sync{handler: handler}
}
type Sync struct {
handler func()
}
func (s *Sync) OnInitialize(srv server.Server, reactor *reactor.Reactor[server.Message], queue *queue.Queue[int, string, server.Message], ident string) {
}
func (s *Sync) OnProcess(finish func(err error)) {
defer finish(super.RecoverTransform(recover()))
s.handler()
}

View File

@ -2,6 +2,7 @@ package server
import ( import (
"github.com/kercylan98/minotaur/utils/log/v2" "github.com/kercylan98/minotaur/utils/log/v2"
"os"
"sync" "sync"
"time" "time"
) )
@ -18,7 +19,7 @@ func DefaultOptions() *Options {
actorMessageBufferInitialSize: 1024, actorMessageBufferInitialSize: 1024,
messageErrorHandler: nil, messageErrorHandler: nil,
lifeCycleLimit: 0, lifeCycleLimit: 0,
logger: log.GetLogger(), logger: log.NewLogger(log.NewHandler(os.Stdout, log.DefaultOptions().WithCallerSkip(-1).WithLevel(log.LevelInfo))),
} }
} }
@ -33,6 +34,8 @@ type Options struct {
lifeCycleLimit time.Duration // 服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器 lifeCycleLimit time.Duration // 服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器
logger *log.Logger // 日志记录器 logger *log.Logger // 日志记录器
debug bool // Debug 模式 debug bool // Debug 模式
syncLowMessageDuration time.Duration // 同步慢消息时间
asyncLowMessageDuration time.Duration // 异步慢消息时间
} }
func (opt *Options) init(srv *server) *Options { func (opt *Options) init(srv *server) *Options {
@ -53,6 +56,9 @@ func (opt *Options) Apply(options ...*Options) {
opt.messageErrorHandler = option.messageErrorHandler opt.messageErrorHandler = option.messageErrorHandler
opt.lifeCycleLimit = option.lifeCycleLimit opt.lifeCycleLimit = option.lifeCycleLimit
opt.logger = option.logger opt.logger = option.logger
opt.debug = option.debug
opt.syncLowMessageDuration = option.syncLowMessageDuration
opt.asyncLowMessageDuration = option.asyncLowMessageDuration
option.rw.RUnlock() option.rw.RUnlock()
} }
@ -65,6 +71,32 @@ func (opt *Options) active() {
opt.server.notify.lifeCycleTime <- opt.GetLifeCycleLimit() opt.server.notify.lifeCycleTime <- opt.GetLifeCycleLimit()
} }
// WithSyncLowMessageMonitor 设置同步消息的慢消息监测时间
func (opt *Options) WithSyncLowMessageMonitor(duration time.Duration) *Options {
return opt.modifyOptionsValue(func(opt *Options) {
opt.syncLowMessageDuration = duration
})
}
func (opt *Options) GetSyncLowMessageDuration() time.Duration {
return getOptionsValue(opt, func(opt *Options) time.Duration {
return opt.syncLowMessageDuration
})
}
// WithAsyncLowMessageMonitor 设置异步消息的慢消息监测时间
func (opt *Options) WithAsyncLowMessageMonitor(duration time.Duration) *Options {
return opt.modifyOptionsValue(func(opt *Options) {
opt.asyncLowMessageDuration = duration
})
}
func (opt *Options) GetAsyncLowMessageDuration() time.Duration {
return getOptionsValue(opt, func(opt *Options) time.Duration {
return opt.asyncLowMessageDuration
})
}
// WithDebug 设置 Debug 模式是否开启 // WithDebug 设置 Debug 模式是否开启
// - 该函数支持运行时设置 // - 该函数支持运行时设置
func (opt *Options) WithDebug(debug bool) *Options { func (opt *Options) WithDebug(debug bool) *Options {

View File

@ -57,7 +57,7 @@ func NewServer(network Network, options ...*Options) Server {
srv.GetServerMessageChannelSize(), srv.GetActorMessageChannelSize(), srv.GetServerMessageChannelSize(), srv.GetActorMessageChannelSize(),
srv.GetServerMessageBufferInitialSize(), srv.GetActorMessageBufferInitialSize(), srv.GetServerMessageBufferInitialSize(), srv.GetActorMessageBufferInitialSize(),
func(message queue.MessageWrapper[int, string, Message]) { func(message queue.MessageWrapper[int, string, Message]) {
message.Message().Execute() message.Message().OnExecute()
}, func(message queue.MessageWrapper[int, string, Message], err error) { }, func(message queue.MessageWrapper[int, string, Message], err error) {
if handler := srv.GetMessageErrorHandler(); handler != nil { if handler := srv.GetMessageErrorHandler(); handler != nil {
handler(srv, message.Message(), err) handler(srv, message.Message(), err)