other: server 包部分内容可读性优化,增加健壮度

This commit is contained in:
kercylan98 2023-12-29 12:02:12 +08:00
parent 5b53e8a2ac
commit 472fdc3a18
3 changed files with 23 additions and 26 deletions

View File

@ -1,6 +1,7 @@
package server
import (
"github.com/kercylan98/minotaur/utils/super"
"github.com/xtaci/kcp-go/v5"
"net"
"sync"
@ -21,14 +22,14 @@ func (l *listener) init() *listener {
func (l *listener) Accept() (net.Conn, error) {
l.once.Do(func() {
l.state <- nil
super.TryWriteChannel(l.state, nil)
})
return l.Listener.Accept()
}
func (l *listener) AcceptKCP() (*kcp.UDPSession, error) {
l.once.Do(func() {
l.state <- nil
super.TryWriteChannel(l.state, nil)
})
return l.kcpListener.AcceptKCP()
}

View File

@ -128,7 +128,7 @@ func (n Network) gNetMode(state chan<- error, srv *Server) {
gnet.WithTicker(true),
gnet.WithMulticore(true),
); err != nil {
srv.gServer.state <- err
super.TryWriteChannel(srv.gServer.state, err)
}
}(srv)
}
@ -143,7 +143,7 @@ func (n Network) grpcMode(state chan<- error, srv *Server) {
lis := (&listener{srv: srv, Listener: l, state: state}).init()
go func(srv *Server, lis *listener) {
if err = srv.grpcServer.Serve(lis); err != nil {
lis.state <- err
super.TryWriteChannel(lis.state, err)
}
}(srv, lis)
}
@ -152,7 +152,7 @@ func (n Network) grpcMode(state chan<- error, srv *Server) {
func (n Network) kcpMode(state chan<- error, srv *Server) {
l, err := kcp.ListenWithOptions(srv.addr, nil, 0, 0)
if err != nil {
state <- err
super.TryWriteChannel(state, err)
return
}
lis := (&listener{srv: srv, kcpListener: l, state: state}).init()
@ -195,7 +195,7 @@ func (n Network) httpMode(state chan<- error, srv *Server) {
srv.httpServer.Addr = srv.addr
l, err := net.Listen(string(NetworkTcp), srv.addr)
if err != nil {
state <- err
super.TryWriteChannel(state, err)
return
}
gin.SetMode(gin.ReleaseMode)
@ -215,7 +215,7 @@ func (n Network) httpMode(state chan<- error, srv *Server) {
err = lis.srv.httpServer.Serve(lis)
}
if err != nil {
lis.state <- err
super.TryWriteChannel(lis.state, err)
}
}((&listener{srv: srv, Listener: l, state: state}).init())
}
@ -224,7 +224,7 @@ func (n Network) httpMode(state chan<- error, srv *Server) {
func (n Network) websocketMode(state chan<- error, srv *Server) {
l, err := net.Listen(string(NetworkTcp), srv.addr)
if err != nil {
state <- err
super.TryWriteChannel(state, err)
return
}
var pattern string
@ -302,7 +302,7 @@ func (n Network) websocketMode(state chan<- error, srv *Server) {
err = http.Serve(lis, nil)
}
if err != nil {
lis.state <- err
super.TryWriteChannel(lis.state, err)
}
}((&listener{srv: srv, Listener: l, state: state}).init())
}

View File

@ -184,7 +184,7 @@ func (srv *Server) Ticker() *timer.Ticker {
// Shutdown 主动停止运行服务器
func (srv *Server) Shutdown() {
srv.systemSignal <- syscall.SIGQUIT
super.TryWriteChannel[os.Signal](srv.systemSignal, syscall.SIGQUIT)
}
// shutdown 停止运行服务器
@ -204,11 +204,7 @@ func (srv *Server) shutdown(err error) {
if srv.multiple == nil {
srv.OnStopEvent()
}
defer func() {
if srv.multipleRuntimeErrorChan != nil {
srv.multipleRuntimeErrorChan <- err
}
}()
defer super.TryWriteChannel(srv.multipleRuntimeErrorChan, err)
srv.cancel()
if srv.gServer != nil {
if shutdownErr := gnet.Stop(context.Background(), fmt.Sprintf("%s://%s", srv.network, srv.addr)); err != nil {
@ -263,7 +259,7 @@ func (srv *Server) shutdown(err error) {
log.Info("Server", log.Any("network", srv.network), log.String("listen", srv.addr),
log.String("action", "shutdown"), log.String("state", "normal"))
}
srv.closeChannel <- struct{}{}
super.TryWriteChannel(srv.closeChannel, struct{}{})
}
// GRPCServer 当网络类型为 NetworkGRPC 时将被允许获取 grpc 服务器,否则将会发生 panic
@ -456,14 +452,14 @@ func (srv *Server) low(message *Message, present time.Time, expect time.Duration
}
// dispatchMessage 消息分发
func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
func (srv *Server) dispatchMessage(dispatcherIns *dispatcher, msg *Message) {
var (
ctx context.Context
cancel context.CancelFunc
)
if srv.deadlockDetect > 0 {
ctx, cancel = context.WithTimeout(context.Background(), srv.deadlockDetect)
go func(ctx context.Context, msg *Message) {
go func(ctx context.Context, srv *Server, msg *Message) {
select {
case <-ctx.Done():
if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) {
@ -471,12 +467,12 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
srv.OnDeadlockDetectEvent(msg)
}
}
}(ctx, msg)
}(ctx, srv, msg)
}
present := time.Now()
if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync {
defer func(msg *Message) {
defer func(cancel context.CancelFunc, srv *Server, dispatcherIns *dispatcher, msg *Message, present time.Time) {
super.Handle(cancel)
if err := super.RecoverTransform(recover()); err != nil {
stack := string(debug.Stack())
@ -485,7 +481,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
srv.OnMessageErrorEvent(msg, err)
}
if msg.t == MessageTypeUniqueAsyncCallback || msg.t == MessageTypeUniqueShuntAsyncCallback {
dispatcher.antiUnique(msg.name)
dispatcherIns.antiUnique(msg.name)
}
srv.low(msg, present, time.Millisecond*100)
@ -494,7 +490,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) {
srv.messagePool.Release(msg)
}
}(msg)
}(cancel, srv, dispatcherIns, msg, present)
} else {
if cancel != nil {
defer cancel()
@ -512,10 +508,10 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
msg.ordinaryHandler()
case MessageTypeAsync, MessageTypeShuntAsync, MessageTypeUniqueAsync, MessageTypeUniqueShuntAsync:
if err := srv.ants.Submit(func() {
defer func() {
defer func(cancel context.CancelFunc, srv *Server, dispatcherIns *dispatcher, msg *Message, present time.Time) {
if err := super.RecoverTransform(recover()); err != nil {
if msg.t == MessageTypeUniqueAsync || msg.t == MessageTypeUniqueShuntAsync {
dispatcher.antiUnique(msg.name)
dispatcherIns.antiUnique(msg.name)
}
stack := string(debug.Stack())
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", stack))
@ -529,7 +525,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) {
srv.messagePool.Release(msg)
}
}()
}(cancel, srv, dispatcherIns, msg, present)
var err error
if msg.exceptionHandler != nil {
err = msg.exceptionHandler()
@ -550,7 +546,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
srv.PushShuntAsyncCallbackMessage(msg.conn, err, msg.errHandler)
return
}
dispatcher.antiUnique(msg.name)
dispatcherIns.antiUnique(msg.name)
if err != nil {
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", string(debug.Stack())))
}