diff --git a/examples/simple-server-cross/cross-a/main.go b/examples/simple-server-cross/cross-a/main.go index 1c01c2a..839f306 100644 --- a/examples/simple-server-cross/cross-a/main.go +++ b/examples/simple-server-cross/cross-a/main.go @@ -13,9 +13,7 @@ func main() { srv := server.New(server.NetworkWebsocket, server.WithCross("nats", 1, cross.NewNats("127.0.0.1:4222")), server.WithTicker(10, false)) srv.RegStartFinishEvent(func(srv *server.Server) { srv.Ticker().Loop("CROSS", timer.Instantly, time.Second, timer.Forever, func() { - if err := srv.PushCrossMessage("nats", 2, []byte("I am cross 1")); err != nil { - panic(err) - } + server.PushCrossMessage(srv, "nats", 2, []byte("I am cross 1")) }) }) srv.RegReceiveCrossPacketEvent(func(srv *server.Server, senderServerId int64, packet []byte) { diff --git a/examples/simple-server-cross/cross-b/main.go b/examples/simple-server-cross/cross-b/main.go index 8df686e..eca8f00 100644 --- a/examples/simple-server-cross/cross-b/main.go +++ b/examples/simple-server-cross/cross-b/main.go @@ -13,9 +13,7 @@ func main() { srv := server.New(server.NetworkWebsocket, server.WithCross("nats", 2, cross.NewNats("127.0.0.1:4222")), server.WithTicker(10, false)) srv.RegStartFinishEvent(func(srv *server.Server) { srv.Ticker().Loop("CROSS", timer.Instantly, time.Second, timer.Forever, func() { - if err := srv.PushCrossMessage("nats", 1, []byte("I am cross 2")); err != nil { - panic(err) - } + server.PushCrossMessage(srv, "nats", 1, []byte("I am cross 2")) }) }) srv.RegReceiveCrossPacketEvent(func(srv *server.Server, senderServerId int64, packet []byte) { diff --git a/server/gnet.go b/server/gnet.go index a21e630..d42d06b 100644 --- a/server/gnet.go +++ b/server/gnet.go @@ -48,7 +48,7 @@ func (slf *gNet) AfterWrite(c gnet.Conn, b []byte) { } func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Action) { - slf.Server.PushMessage(MessageTypePacket, c.Context().(*Conn), bytes.Clone(packet)) + slf.Server.pushMessage(MessageTypePacket, c.Context().(*Conn), bytes.Clone(packet)) return nil, gnet.None } diff --git a/server/message.go b/server/message.go index 7a6a173..fb2122f 100644 --- a/server/message.go +++ b/server/message.go @@ -1,24 +1,22 @@ package server +import "runtime/debug" + const ( // MessageTypePacket 数据包消息类型:该类型的数据将被发送到 ConnectionReceivePacketEvent 进行处理 - // - *server.Conn - // - []byte MessageTypePacket MessageType = iota // MessageTypeError 错误消息类型:根据不同的错误状态,将交由 Server 进行统一处理 - // - error - // - server.MessageErrorAction MessageTypeError // MessageTypeCross 跨服消息类型:将被推送到跨服的 Cross 实现中进行处理 - // - int64(serverId) - // - []byte MessageTypeCross // MessageTypeTicker 定时器消息类型 - // - func() MessageTypeTicker + + // MessageTypeAsync 异步消息类型 + MessageTypeAsync ) var messageNames = map[MessageType]string{ @@ -129,3 +127,42 @@ func (slf MessageType) deconstructTicker(attrs ...any) (caller func()) { } return } + +// PushWebsocketPacketMessage 向特定服务器中推送 WebsocketPacket 消息 +func PushWebsocketPacketMessage(srv *Server, conn *Conn, packet []byte, messageType int) { + msg := srv.messagePool.Get() + msg.t = MessageTypePacket + msg.attrs = []any{conn, packet, messageType} + srv.pushMessage(msg) +} + +// PushPacketMessage 向特定服务器中推送 Packet 消息 +func PushPacketMessage(srv *Server, conn *Conn, packet []byte) { + msg := srv.messagePool.Get() + msg.t = MessageTypePacket + msg.attrs = []any{conn, packet} + srv.pushMessage(msg) +} + +// PushErrorMessage 向特定服务器中推送 Error 消息 +func PushErrorMessage(srv *Server, err error, action MessageErrorAction) { + msg := srv.messagePool.Get() + msg.t = MessageTypeError + msg.attrs = []any{err, action, string(debug.Stack())} + srv.pushMessage(msg) +} + +// PushCrossMessage 向特定服务器中推送 Cross 消息 +func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []byte) { + if len(srv.cross) == 0 { + return + } + _, exist := srv.cross[crossName] + if !exist { + return + } + msg := srv.messagePool.Get() + msg.t = MessageTypeCross + msg.attrs = []any{serverId, packet} + srv.pushMessage(msg) +} diff --git a/server/options.go b/server/options.go index 2145d0a..c014c17 100644 --- a/server/options.go +++ b/server/options.go @@ -45,7 +45,7 @@ func WithTicker(size int, autonomy bool) Option { srv.ticker = timer.GetTicker(size) } else { srv.ticker = timer.GetTicker(size, timer.WithCaller(func(name string, caller func()) { - srv.PushMessage(MessageTypeTicker, caller) + srv.pushMessage(MessageTypeTicker, caller) })) } } @@ -64,7 +64,7 @@ func WithCross(crossName string, serverId int64, cross Cross) Option { } srv.cross[crossName] = cross err := cross.Init(srv, func(serverId int64, packet []byte) { - srv.PushMessage(MessageTypeCross, serverId, packet) + srv.pushMessage(MessageTypeCross, serverId, packet) }) if err != nil { log.Info("Cross", zap.Int64("ServerID", serverId), zap.String("Cross", reflect.TypeOf(cross).String()), zap.String("State", "WaitNatsRun")) diff --git a/server/server.go b/server/server.go index 2aae110..bfd934f 100644 --- a/server/server.go +++ b/server/server.go @@ -19,7 +19,6 @@ import ( "net/http" "os" "os/signal" - "runtime/debug" "strings" "sync/atomic" "syscall" @@ -144,7 +143,7 @@ func (slf *Server) Run(addr string) error { slf.OnStartBeforeEvent() if err := slf.grpcServer.Serve(listener); err != nil { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } }() case NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix: @@ -158,7 +157,7 @@ func (slf *Server) Run(addr string) error { gnet.WithMulticore(true), ); err != nil { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } }) case NetworkKcp: @@ -191,7 +190,7 @@ func (slf *Server) Run(addr string) error { if err != nil { panic(err) } - slf.PushMessage(MessageTypePacket, conn, buf[:n]) + PushPacketMessage(slf, conn, buf[:n]) } }(conn) } @@ -209,12 +208,12 @@ func (slf *Server) Run(addr string) error { if len(slf.certFile)+len(slf.keyFile) > 0 { if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } } else { if err := slf.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } } @@ -275,7 +274,7 @@ func (slf *Server) Run(addr string) error { if len(slf.supportMessageTypes) > 0 && !slf.supportMessageTypes[messageType] { panic(ErrWebsocketIllegalMessageType) } - slf.PushMessage(MessageTypePacket, conn, packet, messageType) + PushWebsocketPacketMessage(slf, conn, packet, messageType) } }) go func() { @@ -284,12 +283,12 @@ func (slf *Server) Run(addr string) error { if len(slf.certFile)+len(slf.keyFile) > 0 { if err := http.ListenAndServeTLS(slf.addr, slf.certFile, slf.keyFile, nil); err != nil { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } } else { if err := http.ListenAndServe(slf.addr, nil); err != nil { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } } @@ -426,30 +425,13 @@ func (slf *Server) HttpRouter() gin.IRouter { return slf.ginServer } -// PushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求 -func (slf *Server) PushMessage(messageType MessageType, attrs ...any) { +// pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求 +func (slf *Server) pushMessage(message *Message) { if slf.messagePool.IsClose() { + slf.messagePool.Release(message) return } - msg := slf.messagePool.Get() - msg.t = messageType - msg.attrs = attrs - if msg.t == MessageTypeError { - msg.attrs = append(msg.attrs, string(debug.Stack())) - } - slf.messageChannel <- msg -} - -// PushCrossMessage 推送跨服消息到特定跨服的服务器中 -func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []byte) error { - if len(slf.cross) == 0 { - return ErrNoSupportCross - } - cross, exist := slf.cross[crossName] - if !exist { - return ErrUnregisteredCrossName - } - return cross.PushMessage(serverId, packet) + slf.messageChannel <- message } // dispatchMessage 消息分发