服务器连接异步读写

This commit is contained in:
kercylan98 2023-05-15 11:49:36 +08:00
parent 926b69bee1
commit b28badbaab
8 changed files with 133 additions and 43 deletions

View File

@ -20,8 +20,8 @@ func (slf *Player[ID]) GetID() ID {
} }
// Send 向该玩家发送数据 // Send 向该玩家发送数据
func (slf *Player[ID]) Send(packet []byte) error { func (slf *Player[ID]) Send(packet []byte, messageType ...int) {
return slf.conn.Write(packet) slf.conn.Write(packet, messageType...)
} }
// Close 关闭玩家 // Close 关闭玩家

View File

@ -5,7 +5,8 @@ type Player[ID comparable] interface {
// GetID 用户玩家ID // GetID 用户玩家ID
GetID() ID GetID() ID
// Send 发送数据包 // Send 发送数据包
Send(packet []byte) error // - messageType: websocket模式中指定消息类型
Send(packet []byte, messageType ...int)
// Close 关闭玩家并且释放其资源 // Close 关闭玩家并且释放其资源
Close() Close()
} }

View File

@ -9,8 +9,9 @@ import (
) )
// newKcpConn 创建一个处理KCP的连接 // newKcpConn 创建一个处理KCP的连接
func newKcpConn(session *kcp.UDPSession) *Conn { func newKcpConn(server *Server, session *kcp.UDPSession) *Conn {
c := &Conn{ c := &Conn{
server: server,
remoteAddr: session.RemoteAddr(), remoteAddr: session.RemoteAddr(),
ip: session.RemoteAddr().String(), ip: session.RemoteAddr().String(),
kcp: session, kcp: session,
@ -27,8 +28,9 @@ func newKcpConn(session *kcp.UDPSession) *Conn {
} }
// newKcpConn 创建一个处理GNet的连接 // newKcpConn 创建一个处理GNet的连接
func newGNetConn(conn gnet.Conn) *Conn { func newGNetConn(server *Server, conn gnet.Conn) *Conn {
c := &Conn{ c := &Conn{
server: server,
remoteAddr: conn.RemoteAddr(), remoteAddr: conn.RemoteAddr(),
ip: conn.RemoteAddr().String(), ip: conn.RemoteAddr().String(),
gn: conn, gn: conn,
@ -44,8 +46,9 @@ func newGNetConn(conn gnet.Conn) *Conn {
} }
// newKcpConn 创建一个处理WebSocket的连接 // newKcpConn 创建一个处理WebSocket的连接
func newWebsocketConn(ws *websocket.Conn, ip string) *Conn { func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn {
return &Conn{ return &Conn{
server: server,
remoteAddr: ws.RemoteAddr(), remoteAddr: ws.RemoteAddr(),
ip: ip, ip: ip,
ws: ws, ws: ws,
@ -58,6 +61,7 @@ func newWebsocketConn(ws *websocket.Conn, ip string) *Conn {
// Conn 服务器连接 // Conn 服务器连接
type Conn struct { type Conn struct {
server *Server
remoteAddr net.Addr remoteAddr net.Addr
ip string ip string
ws *websocket.Conn ws *websocket.Conn
@ -80,8 +84,17 @@ func (slf *Conn) GetIP() string {
} }
// Write 向连接中写入数据 // Write 向连接中写入数据
func (slf *Conn) Write(data []byte) error { // - messageType: websocket模式中指定消息类型
return slf.write(data) func (slf *Conn) Write(data []byte, messageType ...int) {
if slf.IsWebsocket() {
if len(messageType) > 0 {
slf.server.PushMessage(MessageTypeWritePacket, slf, data, messageType[0])
} else {
slf.server.PushMessage(MessageTypeWritePacket, slf, data, -1)
}
} else {
slf.server.PushMessage(MessageTypeWritePacket, slf, data)
}
} }
// Close 关闭连接 // Close 关闭连接
@ -114,3 +127,8 @@ func (slf *Conn) ReleaseData() *Conn {
} }
return slf return slf
} }
// IsWebsocket 是否是websocket连接
func (slf *Conn) IsWebsocket() bool {
return slf.server.network == NetworkWebsocket
}

View File

@ -3,18 +3,19 @@ package server
import "errors" import "errors"
var ( var (
ErrConstructed = errors.New("the Server must be constructed using the server.New function") ErrConstructed = errors.New("the Server must be constructed using the server.New function")
ErrCanNotSupportNetwork = errors.New("can not support network") ErrCanNotSupportNetwork = errors.New("can not support network")
ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte") ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte")
ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)") ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)")
ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction") ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction")
ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported") ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported")
ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported") ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported")
ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp")
ErrWebsocketMessageTypeException = errors.New("unknown message type, will not work") ErrWebsocketMessageTypeException = errors.New("unknown message type, will not work")
ErrNotWebsocketUseMessageType = errors.New("message type filtering only supports websocket and does not take effect") ErrNotWebsocketUseMessageType = errors.New("message type filtering only supports websocket and does not take effect")
ErrWebsocketIllegalMessageType = errors.New("illegal message type") ErrWebsocketIllegalMessageType = errors.New("illegal message type")
ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register") ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register")
ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register") ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register")
ErrOnlySupportSocket = errors.New("only supports Socket programming") ErrOnlySupportSocket = errors.New("only supports Socket programming")
ErrWebsocketMessageTypeWritePacketAttrs = errors.New("MessageTypeWritePacket must contain *Conn and []byte or *Conn and []byte and MessageType(int)")
) )

View File

@ -18,7 +18,7 @@ func (slf *gNet) OnShutdown(server gnet.Server) {
} }
func (slf *gNet) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) { func (slf *gNet) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
conn := newGNetConn(c) conn := newGNetConn(slf.Server, c)
c.SetContext(conn) c.SetContext(conn)
slf.OnConnectionOpenedEvent(conn) slf.OnConnectionOpenedEvent(conn)
return return

View File

@ -6,6 +6,11 @@ const (
// - []byte // - []byte
MessageTypePacket MessageType = iota MessageTypePacket MessageType = iota
// MessageTypeWritePacket 数据包消息类型:该类型的消息将对客户端进行写入
// - *server.Conn
// - []byte
MessageTypeWritePacket
// MessageTypeError 错误消息类型:根据不同的错误状态,将交由 Server 进行统一处理 // MessageTypeError 错误消息类型:根据不同的错误状态,将交由 Server 进行统一处理
// - error // - error
// - server.MessageErrorAction // - server.MessageErrorAction
@ -79,6 +84,38 @@ func (slf MessageType) deconstructPacket(attrs ...any) (conn *Conn, packet []byt
return return
} }
func (slf MessageType) deconstructWebSocketWritePacket(attrs ...any) (conn *Conn, packet []byte, messageType int) {
messageType = -1
if len(attrs) != 3 {
panic(ErrWebsocketMessageTypeWritePacketAttrs)
}
var ok bool
if conn, ok = attrs[0].(*Conn); !ok {
panic(ErrWebsocketMessageTypeWritePacketAttrs)
}
if packet, ok = attrs[1].([]byte); !ok {
panic(ErrWebsocketMessageTypeWritePacketAttrs)
}
if messageType, ok = attrs[2].(int); !ok {
panic(ErrWebsocketMessageTypeWritePacketAttrs)
}
return
}
func (slf MessageType) deconstructWritePacket(attrs ...any) (conn *Conn, packet []byte) {
if len(attrs) != 2 {
panic(ErrMessageTypePacketAttrs)
}
var ok bool
if conn, ok = attrs[0].(*Conn); !ok {
panic(ErrMessageTypePacketAttrs)
}
if packet, ok = attrs[1].([]byte); !ok {
panic(ErrMessageTypePacketAttrs)
}
return
}
func (slf MessageType) deconstructError(attrs ...any) (err error, action MessageErrorAction) { func (slf MessageType) deconstructError(attrs ...any) (err error, action MessageErrorAction) {
if len(attrs) != 2 { if len(attrs) != 2 {
panic(ErrMessageTypeErrorAttrs) panic(ErrMessageTypeErrorAttrs)

View File

@ -70,6 +70,19 @@ func WithProd() Option {
} }
} }
// WithWebsocketWriteMessageType 设置客户端写入的Websocket消息类型
// - 默认: WebsocketMessageTypeBinary
func WithWebsocketWriteMessageType(messageType int) Option {
return func(srv *Server) {
switch messageType {
case WebsocketMessageTypeText, WebsocketMessageTypeBinary, WebsocketMessageTypeClose, WebsocketMessageTypePing, WebsocketMessageTypePong:
srv.websocketWriteMessageType = messageType
default:
log.Warn("WithWebsocketWriteMessageType", zap.Int("MessageType", messageType), zap.Error(ErrWebsocketMessageTypeException))
}
}
}
// WithWebsocketMessageType 设置仅支持特定类型的Websocket消息 // WithWebsocketMessageType 设置仅支持特定类型的Websocket消息
func WithWebsocketMessageType(messageTypes ...int) Option { func WithWebsocketMessageType(messageTypes ...int) Option {
return func(srv *Server) { return func(srv *Server) {

View File

@ -27,11 +27,12 @@ import (
// New 根据特定网络类型创建一个服务器 // New 根据特定网络类型创建一个服务器
func New(network Network, options ...Option) *Server { func New(network Network, options ...Option) *Server {
server := &Server{ server := &Server{
event: &event{}, event: &event{},
network: network, network: network,
options: options, options: options,
core: 1, core: 1,
closeChannel: make(chan struct{}), closeChannel: make(chan struct{}),
websocketWriteMessageType: WebsocketMessageTypeBinary,
} }
server.event.Server = server server.event.Server = server
@ -63,16 +64,17 @@ type Server struct {
isShutdown atomic.Bool // 是否已关闭 isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号 closeChannel chan struct{} // 关闭信号
gServer *gNet // TCP或UDP模式下的服务器 gServer *gNet // TCP或UDP模式下的服务器
messagePool *synchronization.Pool[*message] // 消息池 messagePool *synchronization.Pool[*message] // 消息池
messagePoolSize int // 消息池大小 messagePoolSize int // 消息池大小
messageChannel chan *message // 消息管道 messageChannel chan *message // 消息管道
initMessageChannel bool // 消息管道是否已经初始化 initMessageChannel bool // 消息管道是否已经初始化
multiple bool // 是否为多服务器模式下运行 multiple bool // 是否为多服务器模式下运行
prod bool // 是否为生产模式 prod bool // 是否为生产模式
core int // 消息处理核心数 core int // 消息处理核心数
diversionMessageChannels []chan *message // 分流消息管道 diversionMessageChannels []chan *message // 分流消息管道
diversionConsistency *hash.Consistency // 哈希一致性分流器 diversionConsistency *hash.Consistency // 哈希一致性分流器
websocketWriteMessageType int // websocket写入的消息类型
} }
// Run 使用特定地址运行服务器 // Run 使用特定地址运行服务器
@ -165,7 +167,7 @@ func (slf *Server) Run(addr string) error {
continue continue
} }
conn := newKcpConn(session) conn := newKcpConn(slf, session)
slf.OnConnectionOpenedEvent(conn) slf.OnConnectionOpenedEvent(conn)
go func(conn *Conn) { go func(conn *Conn) {
@ -236,7 +238,7 @@ func (slf *Server) Run(addr string) error {
} }
} }
conn := newWebsocketConn(ws, ip) conn := newWebsocketConn(slf, ws, ip)
for k, v := range request.URL.Query() { for k, v := range request.URL.Query() {
if len(v) == 1 { if len(v) == 1 {
conn.SetData(k, v) conn.SetData(k, v)
@ -403,14 +405,32 @@ func (slf *Server) dispatchMessage(msg *message) {
case MessageTypePacket: case MessageTypePacket:
if slf.network == NetworkWebsocket { if slf.network == NetworkWebsocket {
conn, packet, messageType := msg.t.deconstructWebSocketPacket(msg.attrs...) conn, packet, messageType := msg.t.deconstructWebSocketPacket(msg.attrs...)
if slf.diversionConsistency != nil {
slf.diversionConsistency.PickNode(conn)
}
slf.OnConnectionReceiveWebsocketPacketEvent(conn, packet, messageType) slf.OnConnectionReceiveWebsocketPacketEvent(conn, packet, messageType)
} else { } else {
conn, packet := msg.t.deconstructPacket(msg.attrs...) conn, packet := msg.t.deconstructPacket(msg.attrs...)
slf.OnConnectionReceivePacketEvent(conn, packet) slf.OnConnectionReceivePacketEvent(conn, packet)
} }
case MessageTypeWritePacket:
if slf.network == NetworkWebsocket {
conn, packet, messageType := msg.t.deconstructWebSocketWritePacket(msg.attrs...)
if messageType == -1 {
messageType = slf.websocketWriteMessageType
}
if err := conn.ws.WriteMessage(messageType, packet); err != nil {
log.Debug("Server", zap.String("ConnID", conn.GetID()), zap.Error(err))
}
} else {
var err error
conn, packet := msg.t.deconstructPacket(msg.attrs...)
if conn.gn != nil {
err = conn.gn.AsyncWrite(packet)
} else if conn.kcp != nil {
_, err = conn.kcp.Write(packet)
}
if err != nil {
log.Debug("Server", zap.String("ConnID", conn.GetID()), zap.Error(err))
}
}
case MessageTypeError: case MessageTypeError:
err, action := msg.t.deconstructError(msg.attrs...) err, action := msg.t.deconstructError(msg.attrs...)
switch action { switch action {