diff --git a/server/corss/message.go b/server/corss/message.go new file mode 100644 index 0000000..138eadc --- /dev/null +++ b/server/corss/message.go @@ -0,0 +1,6 @@ +package corss + +type Message struct { + ServerId int64 `json:"server_id"` + Packet []byte `json:"packet"` +} diff --git a/server/corss/nats.go b/server/corss/nats.go new file mode 100644 index 0000000..6b53d28 --- /dev/null +++ b/server/corss/nats.go @@ -0,0 +1,70 @@ +package corss + +import ( + "encoding/json" + "fmt" + "github.com/kercylan98/minotaur/server" + "github.com/kercylan98/minotaur/utils/log" + "github.com/kercylan98/minotaur/utils/synchronization" + "github.com/nats-io/nats.go" + "go.uber.org/zap" +) + +func NewNats(url string, options ...nats.Option) *Nats { + return &Nats{ + url: url, + subject: "MINOTAUR_CROSS", + options: options, + messagePool: synchronization.NewPool[*Message](1024*100, func() *Message { + return new(Message) + }, func(data *Message) { + data.ServerId = 0 + data.Packet = nil + }), + } +} + +func NewNatsWithSubject(url, subject string, options ...nats.Option) *Nats { + n := NewNats(url, options...) + n.subject = subject + return n +} + +type Nats struct { + conn *nats.Conn + url string + subject string + realSubject string + options []nats.Option + messagePool *synchronization.Pool[*Message] +} + +func (slf *Nats) Init(server *server.Server, packetHandle func(serverId int64, packet []byte)) (err error) { + slf.realSubject = fmt.Sprintf("%s_%d", slf.subject, server.GetID()) + slf.conn, err = nats.Connect(slf.url, slf.options...) + if err != nil { + return err + } + _, err = slf.conn.Subscribe(slf.realSubject, func(msg *nats.Msg) { + message := slf.messagePool.Get() + defer slf.messagePool.Release(message) + if err := json.Unmarshal(msg.Data, &message); err != nil { + log.Error("Cross.Nats", zap.Error(err)) + return + } + packetHandle(message.ServerId, message.Packet) + }) + return err +} + +func (slf *Nats) PushMessage(serverId int64, packet []byte) error { + message := slf.messagePool.Get() + defer slf.messagePool.Release(message) + message.ServerId = serverId + message.Packet = packet + data, err := json.Marshal(message) + if err != nil { + return err + } + return slf.conn.Publish(slf.realSubject, data) +} diff --git a/server/cross.go b/server/cross.go index d4b5493..28d1c10 100644 --- a/server/cross.go +++ b/server/cross.go @@ -5,7 +5,7 @@ type Cross interface { // - serverId: 本服id // - packetHandle.serverId: 发送跨服消息的服务器id // - packetHandle.packet: 数据包 - Init(serverId int64, packetHandle func(serverId int64, packet []byte)) + Init(server *Server, packetHandle func(serverId int64, packet []byte)) error // PushMessage 推送跨服消息 // - serverId: 目标服务器id PushMessage(serverId int64, packet []byte) error diff --git a/server/event.go b/server/event.go index 951dce9..84b4e8f 100644 --- a/server/event.go +++ b/server/event.go @@ -122,6 +122,7 @@ func (slf *event) OnConnectionReceiveWebsocketPacketEvent(conn *Conn, packet []b // RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数 func (slf *event) RegReceiveCrossPacketEvent(handle ReceiveCrossPacketEventHandle) { slf.receiveCrossPacketEventHandles = append(slf.receiveCrossPacketEventHandles, handle) + log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String())) } func (slf *event) OnReceiveCrossPacketEvent(serverId int64, packet []byte) { diff --git a/server/options.go b/server/options.go index b58e0f9..dba24b2 100644 --- a/server/options.go +++ b/server/options.go @@ -5,6 +5,7 @@ import ( "github.com/kercylan98/minotaur/utils/log" "go.uber.org/zap" "google.golang.org/grpc" + "reflect" ) const ( @@ -28,9 +29,13 @@ func WithCross(serverId int64, cross Cross) Option { return func(srv *Server) { srv.id = serverId srv.cross = cross - srv.cross.Init(serverId, func(serverId int64, packet []byte) { + err := srv.cross.Init(srv, func(serverId int64, packet []byte) { srv.PushMessage(MessageTypeCross, serverId, packet) }) + if err != nil { + log.Error("WithCross", zap.Int64("ServerID", serverId), zap.String("Cross", reflect.TypeOf(cross).String())) + panic(err) + } } }