From 0439bffb99cf54802991a15f92a26330568a9d04 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 16 May 2023 10:57:41 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B7=A8=E6=9C=8D=E5=8A=9F=E8=83=BD=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/cross.go | 85 +++++------------------------------------ server/cross_message.go | 8 ---- server/cross_queue.go | 15 -------- server/errors.go | 5 +-- server/event.go | 15 +++----- server/message.go | 11 ++---- server/options.go | 17 +++------ server/server.go | 24 ++++++++---- 8 files changed, 42 insertions(+), 138 deletions(-) delete mode 100644 server/cross_message.go delete mode 100644 server/cross_queue.go diff --git a/server/cross.go b/server/cross.go index 22f37f5..d4b5493 100644 --- a/server/cross.go +++ b/server/cross.go @@ -1,79 +1,12 @@ package server -import ( - "encoding/json" - "github.com/kercylan98/minotaur/utils/log" - "github.com/kercylan98/minotaur/utils/synchronization" - "go.uber.org/zap" -) - -// cross 跨服功能 TODO: 跨服逻辑存在问题 -type cross struct { - server *Server - messageChannel chan *crossMessage - messagePool *synchronization.Pool[*crossMessage] - queues map[CrossQueueName]CrossQueue -} - -func (slf *cross) Run(server *Server, queues ...CrossQueue) error { - slf.server = server - slf.queues = map[CrossQueueName]CrossQueue{} - slf.messagePool = synchronization.NewPool[*crossMessage](100, - func() *crossMessage { - return &crossMessage{} - }, func(data *crossMessage) { - data.toServerId = 0 - data.ServerId = 0 - data.Queue = "" - data.Packet = nil - }, - ) - slf.messageChannel = make(chan *crossMessage, 4096*100) - for i := 0; i < len(slf.queues); i++ { - queue := queues[i] - if _, exist := slf.queues[queue.GetName()]; exist { - return ErrCrossDuplicateQueue - } - if err := queue.Init(); err != nil { - return err - } - slf.queues[queue.GetName()] = queue - queue.Subscribe(slf.server.GetID(), func(bytes []byte) { - message := slf.messagePool.Get() - if err := json.Unmarshal(bytes, message); err != nil { - log.Error("Cross", zap.String("Queue.Receive", string(queue.GetName())), zap.String("Packet", string(bytes)), zap.Error(err)) - return - } - slf.server.PushMessage(MessageTypeCross, message.ServerId, message.Queue, message.Packet) - slf.messagePool.Release(message) - }) - } - go func() { - for message := range slf.messageChannel { - queue := slf.queues[message.Queue] - data, err := json.Marshal(message) - if err != nil { - log.Error("Cross", zap.String("Queue.Push", string(queue.GetName())), zap.String("Packet", string(message.Packet)), zap.Error(err)) - } else if err = queue.Publish(message.toServerId, data); err != nil { - log.Error("Cross", zap.String("Queue.Push", string(queue.GetName())), zap.Error(err)) - } - slf.messagePool.Release(message) - } - }() - return nil -} - -func (slf *cross) PushCrossMessage(queue CrossQueueName, serverId int64, packet []byte) { - message := slf.messagePool.Get() - message.toServerId = serverId - message.ServerId = slf.server.GetID() - message.Queue = queue - message.Packet = packet - slf.messageChannel <- message -} - -func (slf *cross) shutdownCross() { - close(slf.messageChannel) - slf.messagePool.Close() - slf.messagePool = nil +type Cross interface { + // Init 初始化跨服 + // - serverId: 本服id + // - packetHandle.serverId: 发送跨服消息的服务器id + // - packetHandle.packet: 数据包 + Init(serverId int64, packetHandle func(serverId int64, packet []byte)) + // PushMessage 推送跨服消息 + // - serverId: 目标服务器id + PushMessage(serverId int64, packet []byte) error } diff --git a/server/cross_message.go b/server/cross_message.go deleted file mode 100644 index f80208f..0000000 --- a/server/cross_message.go +++ /dev/null @@ -1,8 +0,0 @@ -package server - -type crossMessage struct { - toServerId int64 - ServerId int64 `json:"server_id"` - Queue CrossQueueName `json:"queue"` - Packet []byte `json:"packet"` -} diff --git a/server/cross_queue.go b/server/cross_queue.go deleted file mode 100644 index 1bc6daa..0000000 --- a/server/cross_queue.go +++ /dev/null @@ -1,15 +0,0 @@ -package server - -type CrossQueueName string - -// CrossQueue 跨服消息队列接口 -type CrossQueue interface { - // GetName 获取跨服消息队列名称 - GetName() CrossQueueName - // Init 初始化队列 - Init() error - // Publish 发布跨服消息 - Publish(serverId int64, packet []byte) error - // Subscribe 接收到跨服消息 - Subscribe(serverId int64, packetHandle func([]byte)) -} diff --git a/server/errors.go b/server/errors.go index 1c4637c..1607391 100644 --- a/server/errors.go +++ b/server/errors.go @@ -8,7 +8,7 @@ var ( ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte") ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)") ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction") - ErrMessageTypeCrossErrorAttrs = errors.New("MessageTypeCross must contain int64(server id) and CrossQueueName and []byte") + ErrMessageTypeCrossErrorAttrs = errors.New("MessageTypeCross must contain int64(server id) and []byte") ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported") ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported") ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") @@ -18,6 +18,5 @@ var ( ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register") ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register") ErrOnlySupportSocket = errors.New("only supports Socket programming") - ErrNoSupportGetID = errors.New("the server does not support GetID, please use the WithCross option to create the server") - ErrCrossDuplicateQueue = errors.New("cross duplicate registration Queue") + ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server") ) diff --git a/server/event.go b/server/event.go index 5904698..951dce9 100644 --- a/server/event.go +++ b/server/event.go @@ -23,7 +23,7 @@ type event struct { connectionReceiveWebsocketPacketEventHandles []ConnectionReceiveWebsocketPacketEventHandle connectionOpenedEventHandles []ConnectionOpenedEventHandle connectionClosedEventHandles []ConnectionClosedEventHandle - receiveCrossPacketEventHandles map[CrossQueueName][]ReceiveCrossPacketEventHandle + receiveCrossPacketEventHandles []ReceiveCrossPacketEventHandle } // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 @@ -120,15 +120,12 @@ func (slf *event) OnConnectionReceiveWebsocketPacketEvent(conn *Conn, packet []b } // RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数 -func (slf *event) RegReceiveCrossPacketEvent(queue CrossQueueName, handle ReceiveCrossPacketEventHandle) { - if slf.receiveCrossPacketEventHandles == nil { - slf.receiveCrossPacketEventHandles = map[CrossQueueName][]ReceiveCrossPacketEventHandle{} - } - slf.receiveCrossPacketEventHandles[queue] = append(slf.receiveCrossPacketEventHandles[queue], handle) +func (slf *event) RegReceiveCrossPacketEvent(handle ReceiveCrossPacketEventHandle) { + slf.receiveCrossPacketEventHandles = append(slf.receiveCrossPacketEventHandles, handle) } -func (slf *event) OnReceiveCrossPacketEvent(serverId int64, queue CrossQueueName, packet []byte) { - for _, handle := range slf.receiveCrossPacketEventHandles[queue] { +func (slf *event) OnReceiveCrossPacketEvent(serverId int64, packet []byte) { + for _, handle := range slf.receiveCrossPacketEventHandles { handle(slf.Server, serverId, packet) } } @@ -149,7 +146,7 @@ func (slf *event) check() { } } - if len(slf.receiveCrossPacketEventHandles) > 0 && slf.id == nil { + if len(slf.receiveCrossPacketEventHandles) > 0 && slf.cross == nil { log.Warn("Server", zap.String("ReceiveCrossPacketEvent", "invalid server, not register cross server")) } diff --git a/server/message.go b/server/message.go index fa59818..265536a 100644 --- a/server/message.go +++ b/server/message.go @@ -11,8 +11,8 @@ const ( // - server.MessageErrorAction MessageTypeError - // MessageTypeCross 跨服消息类型:该类型的数据将被发送到对应服务器中进行处理 - // - int64(sender serverId) + // MessageTypeCross 跨服消息类型:将被推送到跨服的 Cross 实现中进行处理 + // - int64(serverId) // - []byte MessageTypeCross ) @@ -98,17 +98,14 @@ func (slf MessageType) deconstructError(attrs ...any) (err error, action Message return } -func (slf MessageType) deconstructCross(attrs ...any) (serverId int64, queue CrossQueueName, packet []byte) { - if len(attrs) != 3 { +func (slf MessageType) deconstructCross(attrs ...any) (serverId int64, packet []byte) { + if len(attrs) != 2 { panic(ErrMessageTypeCrossErrorAttrs) } var ok bool if serverId, ok = attrs[0].(int64); !ok { panic(ErrMessageTypeCrossErrorAttrs) } - if queue, ok = attrs[0].(CrossQueueName); !ok { - panic(ErrMessageTypeCrossErrorAttrs) - } if packet, ok = attrs[1].([]byte); !ok { panic(ErrMessageTypeCrossErrorAttrs) } diff --git a/server/options.go b/server/options.go index b044646..b58e0f9 100644 --- a/server/options.go +++ b/server/options.go @@ -3,10 +3,8 @@ package server import ( "github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/log" - "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" - "runtime/debug" ) const ( @@ -26,17 +24,12 @@ type Option func(srv *Server) type CrossRegisterHandle func(server *Server) error // WithCross 通过跨服的方式创建服务器 -// - CrossQueue: 跨服队列是用于接收和发送跨服消息的队列接口 -func WithCross(serverId int64, queues ...CrossQueue) Option { +func WithCross(serverId int64, cross Cross) Option { return func(srv *Server) { - srv.id = &serverId - srv.RegStartFinishEvent(func(srv *Server) { - srv.cross = new(cross) - if err := srv.cross.Run(srv, queues...); err != nil { - srv.PushMessage(MessageTypeError, errors.WithMessage(err, string(debug.Stack())), MessageErrorActionShutdown) - return - } - log.Info("Server", zap.Int64("CrossID", serverId)) + srv.id = serverId + srv.cross = cross + srv.cross.Init(serverId, func(serverId int64, packet []byte) { + srv.PushMessage(MessageTypeCross, serverId, packet) }) } } diff --git a/server/server.go b/server/server.go index 60a6eb8..94c5767 100644 --- a/server/server.go +++ b/server/server.go @@ -52,9 +52,9 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { - *event - *cross - id *int64 // 服务器id + *event // 事件 + cross Cross // 跨服 + id int64 // 服务器id network Network // 网络类型 addr string // 侦听地址 options []Option // 选项 @@ -326,10 +326,10 @@ func (slf *Server) IsDev() bool { // GetID 获取服务器id func (slf *Server) GetID() int64 { - if slf.id == nil { - panic(ErrNoSupportGetID) + if slf.cross == nil { + panic(ErrNoSupportCross) } - return *slf.id + return slf.id } // Shutdown 停止运行服务器 @@ -401,6 +401,14 @@ func (slf *Server) PushMessage(messageType MessageType, attrs ...any) { } } +// PushCrossMessage 推送跨服消息 +func (slf *Server) PushCrossMessage(serverId int64, packet []byte) error { + if slf.cross == nil { + return ErrNoSupportCross + } + return slf.cross.PushMessage(serverId, packet) +} + // dispatchMessage 消息分发 func (slf *Server) dispatchMessage(msg *message) { defer func() { @@ -432,8 +440,8 @@ func (slf *Server) dispatchMessage(msg *message) { log.Warn("Server", zap.String("not support message error action", action.String())) } case MessageTypeCross: - serverId, queue, packet := msg.t.deconstructCross(msg.attrs...) - slf.OnReceiveCrossPacketEvent(serverId, queue, packet) + serverId, packet := msg.t.deconstructCross(msg.attrs...) + slf.OnReceiveCrossPacketEvent(serverId, packet) default: log.Warn("Server", zap.String("not support message type", msg.t.String())) }