From b28badbaab27dd22adf3d495144d211a64e6204c Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Mon, 15 May 2023 11:49:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E8=AF=BB=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- game/builtin/player.go | 4 +-- game/player.go | 3 ++- server/conn.go | 28 ++++++++++++++++---- server/errors.go | 29 ++++++++++---------- server/gnet.go | 2 +- server/message.go | 37 ++++++++++++++++++++++++++ server/options.go | 13 +++++++++ server/server.go | 60 ++++++++++++++++++++++++++++-------------- 8 files changed, 133 insertions(+), 43 deletions(-) diff --git a/game/builtin/player.go b/game/builtin/player.go index f53b003..e13ff83 100644 --- a/game/builtin/player.go +++ b/game/builtin/player.go @@ -20,8 +20,8 @@ func (slf *Player[ID]) GetID() ID { } // Send 向该玩家发送数据 -func (slf *Player[ID]) Send(packet []byte) error { - return slf.conn.Write(packet) +func (slf *Player[ID]) Send(packet []byte, messageType ...int) { + slf.conn.Write(packet, messageType...) } // Close 关闭玩家 diff --git a/game/player.go b/game/player.go index 1762a84..dd8ec32 100644 --- a/game/player.go +++ b/game/player.go @@ -5,7 +5,8 @@ type Player[ID comparable] interface { // GetID 用户玩家ID GetID() ID // Send 发送数据包 - Send(packet []byte) error + // - messageType: websocket模式中指定消息类型 + Send(packet []byte, messageType ...int) // Close 关闭玩家并且释放其资源 Close() } diff --git a/server/conn.go b/server/conn.go index 5f58f0c..5aa560c 100644 --- a/server/conn.go +++ b/server/conn.go @@ -9,8 +9,9 @@ import ( ) // newKcpConn 创建一个处理KCP的连接 -func newKcpConn(session *kcp.UDPSession) *Conn { +func newKcpConn(server *Server, session *kcp.UDPSession) *Conn { c := &Conn{ + server: server, remoteAddr: session.RemoteAddr(), ip: session.RemoteAddr().String(), kcp: session, @@ -27,8 +28,9 @@ func newKcpConn(session *kcp.UDPSession) *Conn { } // newKcpConn 创建一个处理GNet的连接 -func newGNetConn(conn gnet.Conn) *Conn { +func newGNetConn(server *Server, conn gnet.Conn) *Conn { c := &Conn{ + server: server, remoteAddr: conn.RemoteAddr(), ip: conn.RemoteAddr().String(), gn: conn, @@ -44,8 +46,9 @@ func newGNetConn(conn gnet.Conn) *Conn { } // newKcpConn 创建一个处理WebSocket的连接 -func newWebsocketConn(ws *websocket.Conn, ip string) *Conn { +func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn { return &Conn{ + server: server, remoteAddr: ws.RemoteAddr(), ip: ip, ws: ws, @@ -58,6 +61,7 @@ func newWebsocketConn(ws *websocket.Conn, ip string) *Conn { // Conn 服务器连接 type Conn struct { + server *Server remoteAddr net.Addr ip string ws *websocket.Conn @@ -80,8 +84,17 @@ func (slf *Conn) GetIP() string { } // Write 向连接中写入数据 -func (slf *Conn) Write(data []byte) error { - return slf.write(data) +// - messageType: websocket模式中指定消息类型 +func (slf *Conn) Write(data []byte, messageType ...int) { + if slf.IsWebsocket() { + if len(messageType) > 0 { + slf.server.PushMessage(MessageTypeWritePacket, slf, data, messageType[0]) + } else { + slf.server.PushMessage(MessageTypeWritePacket, slf, data, -1) + } + } else { + slf.server.PushMessage(MessageTypeWritePacket, slf, data) + } } // Close 关闭连接 @@ -114,3 +127,8 @@ func (slf *Conn) ReleaseData() *Conn { } return slf } + +// IsWebsocket 是否是websocket连接 +func (slf *Conn) IsWebsocket() bool { + return slf.server.network == NetworkWebsocket +} diff --git a/server/errors.go b/server/errors.go index cf1da3b..4f655ef 100644 --- a/server/errors.go +++ b/server/errors.go @@ -3,18 +3,19 @@ package server import "errors" var ( - ErrConstructed = errors.New("the Server must be constructed using the server.New function") - ErrCanNotSupportNetwork = errors.New("can not support network") - ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte") - ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)") - ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction") - ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported") - ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported") - ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") - ErrWebsocketMessageTypeException = errors.New("unknown message type, will not work") - ErrNotWebsocketUseMessageType = errors.New("message type filtering only supports websocket and does not take effect") - ErrWebsocketIllegalMessageType = errors.New("illegal message type") - ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register") - ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register") - ErrOnlySupportSocket = errors.New("only supports Socket programming") + ErrConstructed = errors.New("the Server must be constructed using the server.New function") + ErrCanNotSupportNetwork = errors.New("can not support network") + ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte") + ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)") + ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction") + ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported") + ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported") + ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") + ErrWebsocketMessageTypeException = errors.New("unknown message type, will not work") + ErrNotWebsocketUseMessageType = errors.New("message type filtering only supports websocket and does not take effect") + ErrWebsocketIllegalMessageType = errors.New("illegal message type") + ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register") + ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register") + ErrOnlySupportSocket = errors.New("only supports Socket programming") + ErrWebsocketMessageTypeWritePacketAttrs = errors.New("MessageTypeWritePacket must contain *Conn and []byte or *Conn and []byte and MessageType(int)") ) diff --git a/server/gnet.go b/server/gnet.go index 04b1f8b..675490a 100644 --- a/server/gnet.go +++ b/server/gnet.go @@ -18,7 +18,7 @@ func (slf *gNet) OnShutdown(server gnet.Server) { } func (slf *gNet) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) { - conn := newGNetConn(c) + conn := newGNetConn(slf.Server, c) c.SetContext(conn) slf.OnConnectionOpenedEvent(conn) return diff --git a/server/message.go b/server/message.go index d9da968..0e74361 100644 --- a/server/message.go +++ b/server/message.go @@ -6,6 +6,11 @@ const ( // - []byte MessageTypePacket MessageType = iota + // MessageTypeWritePacket 数据包消息类型:该类型的消息将对客户端进行写入 + // - *server.Conn + // - []byte + MessageTypeWritePacket + // MessageTypeError 错误消息类型:根据不同的错误状态,将交由 Server 进行统一处理 // - error // - server.MessageErrorAction @@ -79,6 +84,38 @@ func (slf MessageType) deconstructPacket(attrs ...any) (conn *Conn, packet []byt return } +func (slf MessageType) deconstructWebSocketWritePacket(attrs ...any) (conn *Conn, packet []byte, messageType int) { + messageType = -1 + if len(attrs) != 3 { + panic(ErrWebsocketMessageTypeWritePacketAttrs) + } + var ok bool + if conn, ok = attrs[0].(*Conn); !ok { + panic(ErrWebsocketMessageTypeWritePacketAttrs) + } + if packet, ok = attrs[1].([]byte); !ok { + panic(ErrWebsocketMessageTypeWritePacketAttrs) + } + if messageType, ok = attrs[2].(int); !ok { + panic(ErrWebsocketMessageTypeWritePacketAttrs) + } + return +} + +func (slf MessageType) deconstructWritePacket(attrs ...any) (conn *Conn, packet []byte) { + if len(attrs) != 2 { + panic(ErrMessageTypePacketAttrs) + } + var ok bool + if conn, ok = attrs[0].(*Conn); !ok { + panic(ErrMessageTypePacketAttrs) + } + if packet, ok = attrs[1].([]byte); !ok { + panic(ErrMessageTypePacketAttrs) + } + return +} + func (slf MessageType) deconstructError(attrs ...any) (err error, action MessageErrorAction) { if len(attrs) != 2 { panic(ErrMessageTypeErrorAttrs) diff --git a/server/options.go b/server/options.go index 43e8a58..88a9360 100644 --- a/server/options.go +++ b/server/options.go @@ -70,6 +70,19 @@ func WithProd() Option { } } +// WithWebsocketWriteMessageType 设置客户端写入的Websocket消息类型 +// - 默认: WebsocketMessageTypeBinary +func WithWebsocketWriteMessageType(messageType int) Option { + return func(srv *Server) { + switch messageType { + case WebsocketMessageTypeText, WebsocketMessageTypeBinary, WebsocketMessageTypeClose, WebsocketMessageTypePing, WebsocketMessageTypePong: + srv.websocketWriteMessageType = messageType + default: + log.Warn("WithWebsocketWriteMessageType", zap.Int("MessageType", messageType), zap.Error(ErrWebsocketMessageTypeException)) + } + } +} + // WithWebsocketMessageType 设置仅支持特定类型的Websocket消息 func WithWebsocketMessageType(messageTypes ...int) Option { return func(srv *Server) { diff --git a/server/server.go b/server/server.go index 61cdba4..3d5d951 100644 --- a/server/server.go +++ b/server/server.go @@ -27,11 +27,12 @@ import ( // New 根据特定网络类型创建一个服务器 func New(network Network, options ...Option) *Server { server := &Server{ - event: &event{}, - network: network, - options: options, - core: 1, - closeChannel: make(chan struct{}), + event: &event{}, + network: network, + options: options, + core: 1, + closeChannel: make(chan struct{}), + websocketWriteMessageType: WebsocketMessageTypeBinary, } server.event.Server = server @@ -63,16 +64,17 @@ type Server struct { isShutdown atomic.Bool // 是否已关闭 closeChannel chan struct{} // 关闭信号 - gServer *gNet // TCP或UDP模式下的服务器 - messagePool *synchronization.Pool[*message] // 消息池 - messagePoolSize int // 消息池大小 - messageChannel chan *message // 消息管道 - initMessageChannel bool // 消息管道是否已经初始化 - multiple bool // 是否为多服务器模式下运行 - prod bool // 是否为生产模式 - core int // 消息处理核心数 - diversionMessageChannels []chan *message // 分流消息管道 - diversionConsistency *hash.Consistency // 哈希一致性分流器 + gServer *gNet // TCP或UDP模式下的服务器 + messagePool *synchronization.Pool[*message] // 消息池 + messagePoolSize int // 消息池大小 + messageChannel chan *message // 消息管道 + initMessageChannel bool // 消息管道是否已经初始化 + multiple bool // 是否为多服务器模式下运行 + prod bool // 是否为生产模式 + core int // 消息处理核心数 + diversionMessageChannels []chan *message // 分流消息管道 + diversionConsistency *hash.Consistency // 哈希一致性分流器 + websocketWriteMessageType int // websocket写入的消息类型 } // Run 使用特定地址运行服务器 @@ -165,7 +167,7 @@ func (slf *Server) Run(addr string) error { continue } - conn := newKcpConn(session) + conn := newKcpConn(slf, session) slf.OnConnectionOpenedEvent(conn) go func(conn *Conn) { @@ -236,7 +238,7 @@ func (slf *Server) Run(addr string) error { } } - conn := newWebsocketConn(ws, ip) + conn := newWebsocketConn(slf, ws, ip) for k, v := range request.URL.Query() { if len(v) == 1 { conn.SetData(k, v) @@ -403,14 +405,32 @@ func (slf *Server) dispatchMessage(msg *message) { case MessageTypePacket: if slf.network == NetworkWebsocket { conn, packet, messageType := msg.t.deconstructWebSocketPacket(msg.attrs...) - if slf.diversionConsistency != nil { - slf.diversionConsistency.PickNode(conn) - } slf.OnConnectionReceiveWebsocketPacketEvent(conn, packet, messageType) } else { conn, packet := msg.t.deconstructPacket(msg.attrs...) slf.OnConnectionReceivePacketEvent(conn, packet) } + case MessageTypeWritePacket: + if slf.network == NetworkWebsocket { + conn, packet, messageType := msg.t.deconstructWebSocketWritePacket(msg.attrs...) + if messageType == -1 { + messageType = slf.websocketWriteMessageType + } + if err := conn.ws.WriteMessage(messageType, packet); err != nil { + log.Debug("Server", zap.String("ConnID", conn.GetID()), zap.Error(err)) + } + } else { + var err error + conn, packet := msg.t.deconstructPacket(msg.attrs...) + if conn.gn != nil { + err = conn.gn.AsyncWrite(packet) + } else if conn.kcp != nil { + _, err = conn.kcp.Write(packet) + } + if err != nil { + log.Debug("Server", zap.String("ConnID", conn.GetID()), zap.Error(err)) + } + } case MessageTypeError: err, action := msg.t.deconstructError(msg.attrs...) switch action {