websocket支持消息类型处理

This commit is contained in:
kercylan98 2023-05-09 18:14:05 +08:00
parent b5c25a3dc8
commit 6b38c7dbc1
4 changed files with 62 additions and 17 deletions

View File

@ -3,13 +3,16 @@ package server
import "errors" import "errors"
var ( var (
ErrConstructed = errors.New("the Server must be constructed using the server.New function") ErrConstructed = errors.New("the Server must be constructed using the server.New function")
ErrCanNotSupportNetwork = errors.New("can not support network") ErrCanNotSupportNetwork = errors.New("can not support network")
ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte") ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte")
ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction") ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)")
ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported") ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction")
ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported")
ErrWebsocketMessageTypeException = errors.New("unknown message type, will not work") ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp")
ErrNotWebsocketUseMessageType = errors.New("message type filtering only supports websocket and does not take effect") ErrWebsocketMessageTypeException = errors.New("unknown message type, will not work")
ErrWebsocketIllegalMessageType = errors.New("illegal message type") ErrNotWebsocketUseMessageType = errors.New("message type filtering only supports websocket and does not take effect")
ErrWebsocketIllegalMessageType = errors.New("illegal message type")
ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register")
ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register")
) )

View File

@ -10,16 +10,18 @@ import (
type StartBeforeEventHandle func(srv *Server) type StartBeforeEventHandle func(srv *Server)
type StartFinishEventHandle func(srv *Server) type StartFinishEventHandle func(srv *Server)
type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte) type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte)
type ConnectionReceiveWebsocketPacketEventHandle func(srv *Server, conn *Conn, packet []byte, messageType int)
type ConnectionOpenedEventHandle func(srv *Server, conn *Conn) type ConnectionOpenedEventHandle func(srv *Server, conn *Conn)
type ConnectionClosedEventHandle func(srv *Server, conn *Conn) type ConnectionClosedEventHandle func(srv *Server, conn *Conn)
type event struct { type event struct {
*Server *Server
startBeforeEventHandles []StartBeforeEventHandle startBeforeEventHandles []StartBeforeEventHandle
startFinishEventHandles []StartFinishEventHandle startFinishEventHandles []StartFinishEventHandle
connectionReceivePacketEventHandles []ConnectionReceivePacketEventHandle connectionReceivePacketEventHandles []ConnectionReceivePacketEventHandle
connectionOpenedEventHandles []ConnectionOpenedEventHandle connectionReceiveWebsocketPacketEventHandles []ConnectionReceiveWebsocketPacketEventHandle
connectionClosedEventHandles []ConnectionClosedEventHandle connectionOpenedEventHandles []ConnectionOpenedEventHandle
connectionClosedEventHandles []ConnectionClosedEventHandle
} }
// RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数
@ -83,6 +85,9 @@ func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacket
if slf.network == NetworkHttp { if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp) panic(ErrNetworkIncompatibleHttp)
} }
if slf.network == NetworkWebsocket {
panic(ErrPleaseUseWebsocketHandle)
}
slf.connectionReceivePacketEventHandles = append(slf.connectionReceivePacketEventHandles, handle) slf.connectionReceivePacketEventHandles = append(slf.connectionReceivePacketEventHandles, handle)
log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String()))
} }
@ -93,6 +98,21 @@ func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) {
} }
} }
// RegConnectionReceiveWebsocketPacketEvent 在接收到Websocket数据包时将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionReceiveWebsocketPacketEvent(handle ConnectionReceiveWebsocketPacketEventHandle) {
if slf.network != NetworkWebsocket {
panic(ErrPleaseUseOrdinaryPacketHandle)
}
slf.connectionReceiveWebsocketPacketEventHandles = append(slf.connectionReceiveWebsocketPacketEventHandles, handle)
log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String()))
}
func (slf *event) OnConnectionReceiveWebsocketPacketEvent(conn *Conn, packet []byte, messageType int) {
for _, handle := range slf.connectionReceiveWebsocketPacketEventHandles {
handle(slf.Server, conn, packet, messageType)
}
}
func (slf *event) check() { func (slf *event) check() {
switch slf.network { switch slf.network {
case NetworkHttp, NetworkGRPC: case NetworkHttp, NetworkGRPC:

View File

@ -48,6 +48,23 @@ func (slf MessageType) String() string {
return messageNames[slf] return messageNames[slf]
} }
func (slf MessageType) deconstructWebSocketPacket(attrs ...any) (conn *Conn, packet []byte, messageType int) {
if len(attrs) != 3 {
panic(ErrWebsocketMessageTypePacketAttrs)
}
var ok bool
if conn, ok = attrs[0].(*Conn); !ok {
panic(ErrWebsocketMessageTypePacketAttrs)
}
if packet, ok = attrs[1].([]byte); !ok {
panic(ErrWebsocketMessageTypePacketAttrs)
}
if messageType, ok = attrs[2].(int); !ok {
panic(ErrWebsocketMessageTypePacketAttrs)
}
return
}
func (slf MessageType) deconstructPacket(attrs ...any) (conn *Conn, packet []byte) { func (slf MessageType) deconstructPacket(attrs ...any) (conn *Conn, packet []byte) {
if len(attrs) != 2 { if len(attrs) != 2 {
panic(ErrMessageTypePacketAttrs) panic(ErrMessageTypePacketAttrs)

View File

@ -236,7 +236,7 @@ func (slf *Server) Run(addr string) error {
if !slf.supportMessageTypes[messageType] { if !slf.supportMessageTypes[messageType] {
panic(ErrWebsocketIllegalMessageType) panic(ErrWebsocketIllegalMessageType)
} }
slf.PushMessage(MessageTypePacket, conn, packet) slf.PushMessage(MessageTypePacket, conn, packet, messageType)
} }
}) })
go func() { go func() {
@ -341,8 +341,13 @@ func (slf *Server) dispatchMessage(msg *message) {
}() }()
switch msg.t { switch msg.t {
case MessageTypePacket: case MessageTypePacket:
conn, packet := msg.t.deconstructPacket(msg.attrs...) if slf.network == NetworkWebsocket {
slf.OnConnectionReceivePacketEvent(conn, packet) conn, packet, messageType := msg.t.deconstructWebSocketPacket(msg.attrs...)
slf.OnConnectionReceiveWebsocketPacketEvent(conn, packet, messageType)
} else {
conn, packet := msg.t.deconstructPacket(msg.attrs...)
slf.OnConnectionReceivePacketEvent(conn, packet)
}
case MessageTypeError: case MessageTypeError:
err, action := msg.t.deconstructError(msg.attrs...) err, action := msg.t.deconstructError(msg.attrs...)
switch action { switch action {