跨服功能实现

This commit is contained in:
kercylan98 2023-05-16 10:57:41 +08:00
parent db86a646f4
commit 0439bffb99
8 changed files with 42 additions and 138 deletions

View File

@ -1,79 +1,12 @@
package server package server
import ( type Cross interface {
"encoding/json" // Init 初始化跨服
"github.com/kercylan98/minotaur/utils/log" // - serverId: 本服id
"github.com/kercylan98/minotaur/utils/synchronization" // - packetHandle.serverId: 发送跨服消息的服务器id
"go.uber.org/zap" // - packetHandle.packet: 数据包
) Init(serverId int64, packetHandle func(serverId int64, packet []byte))
// PushMessage 推送跨服消息
// cross 跨服功能 TODO: 跨服逻辑存在问题 // - serverId: 目标服务器id
type cross struct { PushMessage(serverId int64, packet []byte) error
server *Server
messageChannel chan *crossMessage
messagePool *synchronization.Pool[*crossMessage]
queues map[CrossQueueName]CrossQueue
}
func (slf *cross) Run(server *Server, queues ...CrossQueue) error {
slf.server = server
slf.queues = map[CrossQueueName]CrossQueue{}
slf.messagePool = synchronization.NewPool[*crossMessage](100,
func() *crossMessage {
return &crossMessage{}
}, func(data *crossMessage) {
data.toServerId = 0
data.ServerId = 0
data.Queue = ""
data.Packet = nil
},
)
slf.messageChannel = make(chan *crossMessage, 4096*100)
for i := 0; i < len(slf.queues); i++ {
queue := queues[i]
if _, exist := slf.queues[queue.GetName()]; exist {
return ErrCrossDuplicateQueue
}
if err := queue.Init(); err != nil {
return err
}
slf.queues[queue.GetName()] = queue
queue.Subscribe(slf.server.GetID(), func(bytes []byte) {
message := slf.messagePool.Get()
if err := json.Unmarshal(bytes, message); err != nil {
log.Error("Cross", zap.String("Queue.Receive", string(queue.GetName())), zap.String("Packet", string(bytes)), zap.Error(err))
return
}
slf.server.PushMessage(MessageTypeCross, message.ServerId, message.Queue, message.Packet)
slf.messagePool.Release(message)
})
}
go func() {
for message := range slf.messageChannel {
queue := slf.queues[message.Queue]
data, err := json.Marshal(message)
if err != nil {
log.Error("Cross", zap.String("Queue.Push", string(queue.GetName())), zap.String("Packet", string(message.Packet)), zap.Error(err))
} else if err = queue.Publish(message.toServerId, data); err != nil {
log.Error("Cross", zap.String("Queue.Push", string(queue.GetName())), zap.Error(err))
}
slf.messagePool.Release(message)
}
}()
return nil
}
func (slf *cross) PushCrossMessage(queue CrossQueueName, serverId int64, packet []byte) {
message := slf.messagePool.Get()
message.toServerId = serverId
message.ServerId = slf.server.GetID()
message.Queue = queue
message.Packet = packet
slf.messageChannel <- message
}
func (slf *cross) shutdownCross() {
close(slf.messageChannel)
slf.messagePool.Close()
slf.messagePool = nil
} }

View File

@ -1,8 +0,0 @@
package server
type crossMessage struct {
toServerId int64
ServerId int64 `json:"server_id"`
Queue CrossQueueName `json:"queue"`
Packet []byte `json:"packet"`
}

View File

@ -1,15 +0,0 @@
package server
type CrossQueueName string
// CrossQueue 跨服消息队列接口
type CrossQueue interface {
// GetName 获取跨服消息队列名称
GetName() CrossQueueName
// Init 初始化队列
Init() error
// Publish 发布跨服消息
Publish(serverId int64, packet []byte) error
// Subscribe 接收到跨服消息
Subscribe(serverId int64, packetHandle func([]byte))
}

View File

@ -8,7 +8,7 @@ var (
ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte") ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte")
ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)") ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)")
ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction") ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction")
ErrMessageTypeCrossErrorAttrs = errors.New("MessageTypeCross must contain int64(server id) and CrossQueueName and []byte") ErrMessageTypeCrossErrorAttrs = errors.New("MessageTypeCross must contain int64(server id) and []byte")
ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported") ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported")
ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported") ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported")
ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp")
@ -18,6 +18,5 @@ var (
ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register") ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register")
ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register") ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register")
ErrOnlySupportSocket = errors.New("only supports Socket programming") ErrOnlySupportSocket = errors.New("only supports Socket programming")
ErrNoSupportGetID = errors.New("the server does not support GetID, please use the WithCross option to create the server") ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server")
ErrCrossDuplicateQueue = errors.New("cross duplicate registration Queue")
) )

View File

@ -23,7 +23,7 @@ type event struct {
connectionReceiveWebsocketPacketEventHandles []ConnectionReceiveWebsocketPacketEventHandle connectionReceiveWebsocketPacketEventHandles []ConnectionReceiveWebsocketPacketEventHandle
connectionOpenedEventHandles []ConnectionOpenedEventHandle connectionOpenedEventHandles []ConnectionOpenedEventHandle
connectionClosedEventHandles []ConnectionClosedEventHandle connectionClosedEventHandles []ConnectionClosedEventHandle
receiveCrossPacketEventHandles map[CrossQueueName][]ReceiveCrossPacketEventHandle receiveCrossPacketEventHandles []ReceiveCrossPacketEventHandle
} }
// RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数
@ -120,15 +120,12 @@ func (slf *event) OnConnectionReceiveWebsocketPacketEvent(conn *Conn, packet []b
} }
// RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数 // RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数
func (slf *event) RegReceiveCrossPacketEvent(queue CrossQueueName, handle ReceiveCrossPacketEventHandle) { func (slf *event) RegReceiveCrossPacketEvent(handle ReceiveCrossPacketEventHandle) {
if slf.receiveCrossPacketEventHandles == nil { slf.receiveCrossPacketEventHandles = append(slf.receiveCrossPacketEventHandles, handle)
slf.receiveCrossPacketEventHandles = map[CrossQueueName][]ReceiveCrossPacketEventHandle{}
}
slf.receiveCrossPacketEventHandles[queue] = append(slf.receiveCrossPacketEventHandles[queue], handle)
} }
func (slf *event) OnReceiveCrossPacketEvent(serverId int64, queue CrossQueueName, packet []byte) { func (slf *event) OnReceiveCrossPacketEvent(serverId int64, packet []byte) {
for _, handle := range slf.receiveCrossPacketEventHandles[queue] { for _, handle := range slf.receiveCrossPacketEventHandles {
handle(slf.Server, serverId, packet) handle(slf.Server, serverId, packet)
} }
} }
@ -149,7 +146,7 @@ func (slf *event) check() {
} }
} }
if len(slf.receiveCrossPacketEventHandles) > 0 && slf.id == nil { if len(slf.receiveCrossPacketEventHandles) > 0 && slf.cross == nil {
log.Warn("Server", zap.String("ReceiveCrossPacketEvent", "invalid server, not register cross server")) log.Warn("Server", zap.String("ReceiveCrossPacketEvent", "invalid server, not register cross server"))
} }

View File

@ -11,8 +11,8 @@ const (
// - server.MessageErrorAction // - server.MessageErrorAction
MessageTypeError MessageTypeError
// MessageTypeCross 跨服消息类型:该类型的数据将被发送到对应服务器中进行处理 // MessageTypeCross 跨服消息类型:将被推送到跨服的 Cross 实现中进行处理
// - int64(sender serverId) // - int64(serverId)
// - []byte // - []byte
MessageTypeCross MessageTypeCross
) )
@ -98,17 +98,14 @@ func (slf MessageType) deconstructError(attrs ...any) (err error, action Message
return return
} }
func (slf MessageType) deconstructCross(attrs ...any) (serverId int64, queue CrossQueueName, packet []byte) { func (slf MessageType) deconstructCross(attrs ...any) (serverId int64, packet []byte) {
if len(attrs) != 3 { if len(attrs) != 2 {
panic(ErrMessageTypeCrossErrorAttrs) panic(ErrMessageTypeCrossErrorAttrs)
} }
var ok bool var ok bool
if serverId, ok = attrs[0].(int64); !ok { if serverId, ok = attrs[0].(int64); !ok {
panic(ErrMessageTypeCrossErrorAttrs) panic(ErrMessageTypeCrossErrorAttrs)
} }
if queue, ok = attrs[0].(CrossQueueName); !ok {
panic(ErrMessageTypeCrossErrorAttrs)
}
if packet, ok = attrs[1].([]byte); !ok { if packet, ok = attrs[1].([]byte); !ok {
panic(ErrMessageTypeCrossErrorAttrs) panic(ErrMessageTypeCrossErrorAttrs)
} }

View File

@ -3,10 +3,8 @@ package server
import ( import (
"github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/hash"
"github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/log"
"github.com/pkg/errors"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"runtime/debug"
) )
const ( const (
@ -26,17 +24,12 @@ type Option func(srv *Server)
type CrossRegisterHandle func(server *Server) error type CrossRegisterHandle func(server *Server) error
// WithCross 通过跨服的方式创建服务器 // WithCross 通过跨服的方式创建服务器
// - CrossQueue: 跨服队列是用于接收和发送跨服消息的队列接口 func WithCross(serverId int64, cross Cross) Option {
func WithCross(serverId int64, queues ...CrossQueue) Option {
return func(srv *Server) { return func(srv *Server) {
srv.id = &serverId srv.id = serverId
srv.RegStartFinishEvent(func(srv *Server) { srv.cross = cross
srv.cross = new(cross) srv.cross.Init(serverId, func(serverId int64, packet []byte) {
if err := srv.cross.Run(srv, queues...); err != nil { srv.PushMessage(MessageTypeCross, serverId, packet)
srv.PushMessage(MessageTypeError, errors.WithMessage(err, string(debug.Stack())), MessageErrorActionShutdown)
return
}
log.Info("Server", zap.Int64("CrossID", serverId))
}) })
} }
} }

View File

@ -52,9 +52,9 @@ func New(network Network, options ...Option) *Server {
// Server 网络服务器 // Server 网络服务器
type Server struct { type Server struct {
*event *event // 事件
*cross cross Cross // 跨服
id *int64 // 服务器id id int64 // 服务器id
network Network // 网络类型 network Network // 网络类型
addr string // 侦听地址 addr string // 侦听地址
options []Option // 选项 options []Option // 选项
@ -326,10 +326,10 @@ func (slf *Server) IsDev() bool {
// GetID 获取服务器id // GetID 获取服务器id
func (slf *Server) GetID() int64 { func (slf *Server) GetID() int64 {
if slf.id == nil { if slf.cross == nil {
panic(ErrNoSupportGetID) panic(ErrNoSupportCross)
} }
return *slf.id return slf.id
} }
// Shutdown 停止运行服务器 // Shutdown 停止运行服务器
@ -401,6 +401,14 @@ func (slf *Server) PushMessage(messageType MessageType, attrs ...any) {
} }
} }
// PushCrossMessage 推送跨服消息
func (slf *Server) PushCrossMessage(serverId int64, packet []byte) error {
if slf.cross == nil {
return ErrNoSupportCross
}
return slf.cross.PushMessage(serverId, packet)
}
// dispatchMessage 消息分发 // dispatchMessage 消息分发
func (slf *Server) dispatchMessage(msg *message) { func (slf *Server) dispatchMessage(msg *message) {
defer func() { defer func() {
@ -432,8 +440,8 @@ func (slf *Server) dispatchMessage(msg *message) {
log.Warn("Server", zap.String("not support message error action", action.String())) log.Warn("Server", zap.String("not support message error action", action.String()))
} }
case MessageTypeCross: case MessageTypeCross:
serverId, queue, packet := msg.t.deconstructCross(msg.attrs...) serverId, packet := msg.t.deconstructCross(msg.attrs...)
slf.OnReceiveCrossPacketEvent(serverId, queue, packet) slf.OnReceiveCrossPacketEvent(serverId, packet)
default: default:
log.Warn("Server", zap.String("not support message type", msg.t.String())) log.Warn("Server", zap.String("not support message type", msg.t.String()))
} }