From 472fdc3a188e34fcf2bf52daa7b4e9d1129e3599 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Fri, 29 Dec 2023 12:02:12 +0800 Subject: [PATCH] =?UTF-8?q?other:=20server=20=E5=8C=85=E9=83=A8=E5=88=86?= =?UTF-8?q?=E5=86=85=E5=AE=B9=E5=8F=AF=E8=AF=BB=E6=80=A7=E4=BC=98=E5=8C=96?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=81=A5=E5=A3=AE=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/listener.go | 5 +++-- server/network.go | 14 +++++++------- server/server.go | 30 +++++++++++++----------------- 3 files changed, 23 insertions(+), 26 deletions(-) diff --git a/server/listener.go b/server/listener.go index e75ee92..28e7320 100644 --- a/server/listener.go +++ b/server/listener.go @@ -1,6 +1,7 @@ package server import ( + "github.com/kercylan98/minotaur/utils/super" "github.com/xtaci/kcp-go/v5" "net" "sync" @@ -21,14 +22,14 @@ func (l *listener) init() *listener { func (l *listener) Accept() (net.Conn, error) { l.once.Do(func() { - l.state <- nil + super.TryWriteChannel(l.state, nil) }) return l.Listener.Accept() } func (l *listener) AcceptKCP() (*kcp.UDPSession, error) { l.once.Do(func() { - l.state <- nil + super.TryWriteChannel(l.state, nil) }) return l.kcpListener.AcceptKCP() } diff --git a/server/network.go b/server/network.go index 9f54d9d..e9a6ccf 100644 --- a/server/network.go +++ b/server/network.go @@ -128,7 +128,7 @@ func (n Network) gNetMode(state chan<- error, srv *Server) { gnet.WithTicker(true), gnet.WithMulticore(true), ); err != nil { - srv.gServer.state <- err + super.TryWriteChannel(srv.gServer.state, err) } }(srv) } @@ -143,7 +143,7 @@ func (n Network) grpcMode(state chan<- error, srv *Server) { lis := (&listener{srv: srv, Listener: l, state: state}).init() go func(srv *Server, lis *listener) { if err = srv.grpcServer.Serve(lis); err != nil { - lis.state <- err + super.TryWriteChannel(lis.state, err) } }(srv, lis) } @@ -152,7 +152,7 @@ func (n Network) grpcMode(state chan<- error, srv *Server) { func (n Network) kcpMode(state chan<- error, srv *Server) { l, err := kcp.ListenWithOptions(srv.addr, nil, 0, 0) if err != nil { - state <- err + super.TryWriteChannel(state, err) return } lis := (&listener{srv: srv, kcpListener: l, state: state}).init() @@ -195,7 +195,7 @@ func (n Network) httpMode(state chan<- error, srv *Server) { srv.httpServer.Addr = srv.addr l, err := net.Listen(string(NetworkTcp), srv.addr) if err != nil { - state <- err + super.TryWriteChannel(state, err) return } gin.SetMode(gin.ReleaseMode) @@ -215,7 +215,7 @@ func (n Network) httpMode(state chan<- error, srv *Server) { err = lis.srv.httpServer.Serve(lis) } if err != nil { - lis.state <- err + super.TryWriteChannel(lis.state, err) } }((&listener{srv: srv, Listener: l, state: state}).init()) } @@ -224,7 +224,7 @@ func (n Network) httpMode(state chan<- error, srv *Server) { func (n Network) websocketMode(state chan<- error, srv *Server) { l, err := net.Listen(string(NetworkTcp), srv.addr) if err != nil { - state <- err + super.TryWriteChannel(state, err) return } var pattern string @@ -302,7 +302,7 @@ func (n Network) websocketMode(state chan<- error, srv *Server) { err = http.Serve(lis, nil) } if err != nil { - lis.state <- err + super.TryWriteChannel(lis.state, err) } }((&listener{srv: srv, Listener: l, state: state}).init()) } diff --git a/server/server.go b/server/server.go index 3aff595..a860480 100644 --- a/server/server.go +++ b/server/server.go @@ -184,7 +184,7 @@ func (srv *Server) Ticker() *timer.Ticker { // Shutdown 主动停止运行服务器 func (srv *Server) Shutdown() { - srv.systemSignal <- syscall.SIGQUIT + super.TryWriteChannel[os.Signal](srv.systemSignal, syscall.SIGQUIT) } // shutdown 停止运行服务器 @@ -204,11 +204,7 @@ func (srv *Server) shutdown(err error) { if srv.multiple == nil { srv.OnStopEvent() } - defer func() { - if srv.multipleRuntimeErrorChan != nil { - srv.multipleRuntimeErrorChan <- err - } - }() + defer super.TryWriteChannel(srv.multipleRuntimeErrorChan, err) srv.cancel() if srv.gServer != nil { if shutdownErr := gnet.Stop(context.Background(), fmt.Sprintf("%s://%s", srv.network, srv.addr)); err != nil { @@ -263,7 +259,7 @@ func (srv *Server) shutdown(err error) { log.Info("Server", log.Any("network", srv.network), log.String("listen", srv.addr), log.String("action", "shutdown"), log.String("state", "normal")) } - srv.closeChannel <- struct{}{} + super.TryWriteChannel(srv.closeChannel, struct{}{}) } // GRPCServer 当网络类型为 NetworkGRPC 时将被允许获取 grpc 服务器,否则将会发生 panic @@ -456,14 +452,14 @@ func (srv *Server) low(message *Message, present time.Time, expect time.Duration } // dispatchMessage 消息分发 -func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { +func (srv *Server) dispatchMessage(dispatcherIns *dispatcher, msg *Message) { var ( ctx context.Context cancel context.CancelFunc ) if srv.deadlockDetect > 0 { ctx, cancel = context.WithTimeout(context.Background(), srv.deadlockDetect) - go func(ctx context.Context, msg *Message) { + go func(ctx context.Context, srv *Server, msg *Message) { select { case <-ctx.Done(): if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) { @@ -471,12 +467,12 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { srv.OnDeadlockDetectEvent(msg) } } - }(ctx, msg) + }(ctx, srv, msg) } present := time.Now() if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync { - defer func(msg *Message) { + defer func(cancel context.CancelFunc, srv *Server, dispatcherIns *dispatcher, msg *Message, present time.Time) { super.Handle(cancel) if err := super.RecoverTransform(recover()); err != nil { stack := string(debug.Stack()) @@ -485,7 +481,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { srv.OnMessageErrorEvent(msg, err) } if msg.t == MessageTypeUniqueAsyncCallback || msg.t == MessageTypeUniqueShuntAsyncCallback { - dispatcher.antiUnique(msg.name) + dispatcherIns.antiUnique(msg.name) } srv.low(msg, present, time.Millisecond*100) @@ -494,7 +490,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) { srv.messagePool.Release(msg) } - }(msg) + }(cancel, srv, dispatcherIns, msg, present) } else { if cancel != nil { defer cancel() @@ -512,10 +508,10 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { msg.ordinaryHandler() case MessageTypeAsync, MessageTypeShuntAsync, MessageTypeUniqueAsync, MessageTypeUniqueShuntAsync: if err := srv.ants.Submit(func() { - defer func() { + defer func(cancel context.CancelFunc, srv *Server, dispatcherIns *dispatcher, msg *Message, present time.Time) { if err := super.RecoverTransform(recover()); err != nil { if msg.t == MessageTypeUniqueAsync || msg.t == MessageTypeUniqueShuntAsync { - dispatcher.antiUnique(msg.name) + dispatcherIns.antiUnique(msg.name) } stack := string(debug.Stack()) log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", stack)) @@ -529,7 +525,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) { srv.messagePool.Release(msg) } - }() + }(cancel, srv, dispatcherIns, msg, present) var err error if msg.exceptionHandler != nil { err = msg.exceptionHandler() @@ -550,7 +546,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { srv.PushShuntAsyncCallbackMessage(msg.conn, err, msg.errHandler) return } - dispatcher.antiUnique(msg.name) + dispatcherIns.antiUnique(msg.name) if err != nil { log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", string(debug.Stack()))) }