基于Nats的跨服实现

This commit is contained in:
kercylan98 2023-05-16 11:39:10 +08:00
parent 0439bffb99
commit fc9d334695
5 changed files with 84 additions and 2 deletions

6
server/corss/message.go Normal file
View File

@ -0,0 +1,6 @@
package corss
type Message struct {
ServerId int64 `json:"server_id"`
Packet []byte `json:"packet"`
}

70
server/corss/nats.go Normal file
View File

@ -0,0 +1,70 @@
package corss
import (
"encoding/json"
"fmt"
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/synchronization"
"github.com/nats-io/nats.go"
"go.uber.org/zap"
)
func NewNats(url string, options ...nats.Option) *Nats {
return &Nats{
url: url,
subject: "MINOTAUR_CROSS",
options: options,
messagePool: synchronization.NewPool[*Message](1024*100, func() *Message {
return new(Message)
}, func(data *Message) {
data.ServerId = 0
data.Packet = nil
}),
}
}
func NewNatsWithSubject(url, subject string, options ...nats.Option) *Nats {
n := NewNats(url, options...)
n.subject = subject
return n
}
type Nats struct {
conn *nats.Conn
url string
subject string
realSubject string
options []nats.Option
messagePool *synchronization.Pool[*Message]
}
func (slf *Nats) Init(server *server.Server, packetHandle func(serverId int64, packet []byte)) (err error) {
slf.realSubject = fmt.Sprintf("%s_%d", slf.subject, server.GetID())
slf.conn, err = nats.Connect(slf.url, slf.options...)
if err != nil {
return err
}
_, err = slf.conn.Subscribe(slf.realSubject, func(msg *nats.Msg) {
message := slf.messagePool.Get()
defer slf.messagePool.Release(message)
if err := json.Unmarshal(msg.Data, &message); err != nil {
log.Error("Cross.Nats", zap.Error(err))
return
}
packetHandle(message.ServerId, message.Packet)
})
return err
}
func (slf *Nats) PushMessage(serverId int64, packet []byte) error {
message := slf.messagePool.Get()
defer slf.messagePool.Release(message)
message.ServerId = serverId
message.Packet = packet
data, err := json.Marshal(message)
if err != nil {
return err
}
return slf.conn.Publish(slf.realSubject, data)
}

View File

@ -5,7 +5,7 @@ type Cross interface {
// - serverId: 本服id
// - packetHandle.serverId: 发送跨服消息的服务器id
// - packetHandle.packet: 数据包
Init(serverId int64, packetHandle func(serverId int64, packet []byte))
Init(server *Server, packetHandle func(serverId int64, packet []byte)) error
// PushMessage 推送跨服消息
// - serverId: 目标服务器id
PushMessage(serverId int64, packet []byte) error

View File

@ -122,6 +122,7 @@ func (slf *event) OnConnectionReceiveWebsocketPacketEvent(conn *Conn, packet []b
// RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数
func (slf *event) RegReceiveCrossPacketEvent(handle ReceiveCrossPacketEventHandle) {
slf.receiveCrossPacketEventHandles = append(slf.receiveCrossPacketEventHandles, handle)
log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String()))
}
func (slf *event) OnReceiveCrossPacketEvent(serverId int64, packet []byte) {

View File

@ -5,6 +5,7 @@ import (
"github.com/kercylan98/minotaur/utils/log"
"go.uber.org/zap"
"google.golang.org/grpc"
"reflect"
)
const (
@ -28,9 +29,13 @@ func WithCross(serverId int64, cross Cross) Option {
return func(srv *Server) {
srv.id = serverId
srv.cross = cross
srv.cross.Init(serverId, func(serverId int64, packet []byte) {
err := srv.cross.Init(srv, func(serverId int64, packet []byte) {
srv.PushMessage(MessageTypeCross, serverId, packet)
})
if err != nil {
log.Error("WithCross", zap.Int64("ServerID", serverId), zap.String("Cross", reflect.TypeOf(cross).String()))
panic(err)
}
}
}