跨服功能实现

This commit is contained in:
kercylan98 2023-05-15 15:37:50 +08:00
parent 57460ff40b
commit 9b7fd2b4cf
10 changed files with 206 additions and 8 deletions

9
go.mod
View File

@ -29,15 +29,22 @@ require (
github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/compress v1.16.4 // indirect
github.com/klauspost/cpuid/v2 v2.2.2 // indirect
github.com/klauspost/reedsolomon v1.11.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/nats-io/jwt/v2 v2.4.1 // indirect
github.com/nats-io/nats-server/v2 v2.9.16 // indirect
github.com/nats-io/nats.go v1.25.0 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/richardlehane/mscfb v1.0.4 // indirect
@ -53,12 +60,14 @@ require (
github.com/xuri/excelize/v2 v2.7.1 // indirect
github.com/xuri/nfp v0.0.0-20220409054826-5e722a1d9e22 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/automaxprocs v1.5.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect

19
go.sum
View File

@ -68,6 +68,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU=
github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid v1.2.4/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
@ -93,6 +95,8 @@ github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205Ah
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mmcloughlin/avo v0.0.0-20200803215136-443f81d77104/go.mod h1:wqKykBG2QzQDJEzvRkcS8x6MiSJkF52hXZsXcjaB3ls=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@ -100,6 +104,16 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.16 h1:SuNe6AyCcVy0g5326wtyU8TdqYmcPqzTjhkHojAjprc=
github.com/nats-io/nats-server/v2 v2.9.16/go.mod h1:z1cc5Q+kqJkz9mLUdlcSsdYnId4pyImHjNgoh6zxSC0=
github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE=
github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/panjf2000/ants/v2 v2.4.7 h1:MZnw2JRyTJxFwtaMtUJcwE618wKD04POWk2gwwP4E2M=
github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/panjf2000/gnet v1.6.6 h1:P6bApc54hnVcJVgH+SMe41mn47ECCajB6E/dKq27Y0c=
@ -170,6 +184,8 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
@ -233,6 +249,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -264,6 +281,8 @@ golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=

View File

@ -1,9 +1,79 @@
package server
// Cross 跨服功能接口实现
type Cross interface {
// PushPacket 推送数据包
PushPacket(serverId int64, packet []byte) error
import (
"encoding/json"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/synchronization"
"go.uber.org/zap"
)
//
// cross 跨服功能
type cross struct {
server *Server
messageChannel chan *crossMessage
messagePool *synchronization.Pool[*crossMessage]
queues map[CrossQueueName]CrossQueue
}
func (slf *cross) Run(server *Server, queues ...CrossQueue) error {
slf.server = server
slf.queues = map[CrossQueueName]CrossQueue{}
slf.messagePool = synchronization.NewPool[*crossMessage](100,
func() *crossMessage {
return &crossMessage{}
}, func(data *crossMessage) {
data.toServerId = 0
data.ServerId = 0
data.Queue = ""
data.Packet = nil
},
)
slf.messageChannel = make(chan *crossMessage, 4096*100)
for i := 0; i < len(slf.queues); i++ {
queue := queues[i]
if _, exist := slf.queues[queue.GetName()]; exist {
return ErrCrossDuplicateQueue
}
if err := queue.Init(); err != nil {
return err
}
slf.queues[queue.GetName()] = queue
queue.Subscribe(slf.server.GetID(), func(bytes []byte) {
message := slf.messagePool.Get()
if err := json.Unmarshal(bytes, message); err != nil {
log.Error("Cross", zap.String("Queue.Receive", string(queue.GetName())), zap.String("Packet", string(bytes)), zap.Error(err))
return
}
slf.server.PushMessage(MessageTypeCross, message.ServerId, message.Queue, message.Packet)
slf.messagePool.Release(message)
})
}
go func() {
for message := range slf.messageChannel {
queue := slf.queues[message.Queue]
data, err := json.Marshal(message)
if err != nil {
log.Error("Cross", zap.String("Queue.Push", string(queue.GetName())), zap.String("Packet", string(message.Packet)), zap.Error(err))
} else if err = queue.Publish(message.toServerId, data); err != nil {
log.Error("Cross", zap.String("Queue.Push", string(queue.GetName())), zap.Error(err))
}
slf.messagePool.Release(message)
}
}()
return nil
}
func (slf *cross) PushCrossMessage(queue CrossQueueName, serverId int64, packet []byte) {
message := slf.messagePool.Get()
message.toServerId = serverId
message.ServerId = slf.server.GetID()
message.Queue = queue
message.Packet = packet
slf.messageChannel <- message
}
func (slf *cross) shutdownCross() {
close(slf.messageChannel)
slf.messagePool.Close()
slf.messagePool = nil
}

8
server/cross_message.go Normal file
View File

@ -0,0 +1,8 @@
package server
type crossMessage struct {
toServerId int64
ServerId int64 `json:"server_id"`
Queue CrossQueueName `json:"queue"`
Packet []byte `json:"packet"`
}

15
server/cross_queue.go Normal file
View File

@ -0,0 +1,15 @@
package server
type CrossQueueName string
// CrossQueue 跨服消息队列接口
type CrossQueue interface {
// GetName 获取跨服消息队列名称
GetName() CrossQueueName
// Init 初始化队列
Init() error
// Publish 发布跨服消息
Publish(serverId int64, packet []byte) error
// Subscribe 接收到跨服消息
Subscribe(serverId int64, packetHandle func([]byte))
}

View File

@ -8,6 +8,7 @@ var (
ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte")
ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)")
ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction")
ErrMessageTypeCrossErrorAttrs = errors.New("MessageTypeCross must contain int64(server id) and CrossQueueName and []byte")
ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported")
ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported")
ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp")
@ -17,4 +18,6 @@ var (
ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register")
ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register")
ErrOnlySupportSocket = errors.New("only supports Socket programming")
ErrNoSupportGetID = errors.New("the server does not support GetID, please use the WithCross option to create the server")
ErrCrossDuplicateQueue = errors.New("cross duplicate registration Queue")
)

View File

@ -13,6 +13,7 @@ type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []b
type ConnectionReceiveWebsocketPacketEventHandle func(srv *Server, conn *Conn, packet []byte, messageType int)
type ConnectionOpenedEventHandle func(srv *Server, conn *Conn)
type ConnectionClosedEventHandle func(srv *Server, conn *Conn)
type ReceiveCrossPacketEventHandle func(srv *Server, senderServerId int64, packet []byte)
type event struct {
*Server
@ -22,6 +23,7 @@ type event struct {
connectionReceiveWebsocketPacketEventHandles []ConnectionReceiveWebsocketPacketEventHandle
connectionOpenedEventHandles []ConnectionOpenedEventHandle
connectionClosedEventHandles []ConnectionClosedEventHandle
receiveCrossPacketEventHandles map[CrossQueueName][]ReceiveCrossPacketEventHandle
}
// RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数
@ -117,6 +119,20 @@ func (slf *event) OnConnectionReceiveWebsocketPacketEvent(conn *Conn, packet []b
}
}
// RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数
func (slf *event) RegReceiveCrossPacketEvent(queue CrossQueueName, handle ReceiveCrossPacketEventHandle) {
if slf.receiveCrossPacketEventHandles == nil {
slf.receiveCrossPacketEventHandles = map[CrossQueueName][]ReceiveCrossPacketEventHandle{}
}
slf.receiveCrossPacketEventHandles[queue] = append(slf.receiveCrossPacketEventHandles[queue], handle)
}
func (slf *event) OnReceiveCrossPacketEvent(serverId int64, queue CrossQueueName, packet []byte) {
for _, handle := range slf.receiveCrossPacketEventHandles[queue] {
handle(slf.Server, serverId, packet)
}
}
func (slf *event) check() {
switch slf.network {
case NetworkHttp, NetworkGRPC:
@ -124,13 +140,17 @@ func (slf *event) check() {
switch slf.network {
case NetworkWebsocket:
if len(slf.connectionReceiveWebsocketPacketEventHandles) == 0 {
log.Warn("Server", zap.String("ConnectionReceiveWebsocketPacketEvent", "Invalid server, no packets processed"))
log.Warn("Server", zap.String("ConnectionReceiveWebsocketPacketEvent", "invalid server, no packets processed"))
}
default:
if len(slf.connectionReceivePacketEventHandles) == 0 {
log.Warn("Server", zap.String("ConnectionReceivePacketEvent", "Invalid server, no packets processed"))
log.Warn("Server", zap.String("ConnectionReceivePacketEvent", "invalid server, no packets processed"))
}
}
}
if len(slf.receiveCrossPacketEventHandles) > 0 && slf.id == nil {
log.Warn("Server", zap.String("ReceiveCrossPacketEvent", "invalid server, not register cross server"))
}
}

View File

@ -10,6 +10,11 @@ const (
// - error
// - server.MessageErrorAction
MessageTypeError
// MessageTypeCross 跨服消息类型:该类型的数据将被发送到对应服务器中进行处理
// - int64(sender serverId)
// - []byte
MessageTypeCross
)
var messageNames = map[MessageType]string{
@ -92,3 +97,20 @@ func (slf MessageType) deconstructError(attrs ...any) (err error, action Message
}
return
}
func (slf MessageType) deconstructCross(attrs ...any) (serverId int64, queue CrossQueueName, packet []byte) {
if len(attrs) != 3 {
panic(ErrMessageTypeCrossErrorAttrs)
}
var ok bool
if serverId, ok = attrs[0].(int64); !ok {
panic(ErrMessageTypeCrossErrorAttrs)
}
if queue, ok = attrs[0].(CrossQueueName); !ok {
panic(ErrMessageTypeCrossErrorAttrs)
}
if packet, ok = attrs[1].([]byte); !ok {
panic(ErrMessageTypeCrossErrorAttrs)
}
return
}

View File

@ -3,8 +3,10 @@ package server
import (
"github.com/kercylan98/minotaur/utils/hash"
"github.com/kercylan98/minotaur/utils/log"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"runtime/debug"
)
const (
@ -21,6 +23,23 @@ const (
)
type Option func(srv *Server)
type CrossRegisterHandle func(server *Server) error
// WithCross 通过跨服的方式创建服务器
// - CrossQueue: 跨服队列是用于接收和发送跨服消息的队列接口
func WithCross(serverId int64, queues ...CrossQueue) Option {
return func(srv *Server) {
srv.id = &serverId
srv.RegStartFinishEvent(func(srv *Server) {
srv.cross = new(cross)
if err := srv.cross.Run(srv, queues...); err != nil {
srv.PushMessage(MessageTypeError, errors.WithMessage(err, string(debug.Stack())), MessageErrorActionShutdown)
return
}
log.Info("Server", zap.Int64("CrossID", serverId))
})
}
}
// WithConnectPacketDiversion 通过连接数据包消息分流的方式创建服务器
// - 连接消息分流后数据包消息将会从其他消息类型中独立出来,并且由多个消息管道及协程进行处理

View File

@ -53,6 +53,8 @@ func New(network Network, options ...Option) *Server {
// Server 网络服务器
type Server struct {
*event
*cross
id *int64 // 服务器id
network Network // 网络类型
addr string // 侦听地址
options []Option // 选项
@ -99,7 +101,7 @@ func (slf *Server) Run(addr string) error {
var connectionInitHandle = func(callback func()) {
slf.initMessageChannel = true
if slf.messagePoolSize <= 0 {
slf.messagePoolSize = 4096 * 1024
slf.messagePoolSize = 100
}
slf.messagePool = synchronization.NewPool[*message](slf.messagePoolSize,
func() *message {
@ -322,6 +324,14 @@ func (slf *Server) IsDev() bool {
return !slf.prod
}
// GetID 获取服务器id
func (slf *Server) GetID() int64 {
if slf.id == nil {
panic(ErrNoSupportGetID)
}
return *slf.id
}
// Shutdown 停止运行服务器
func (slf *Server) Shutdown(err error) {
slf.isShutdown.Store(true)
@ -421,6 +431,9 @@ func (slf *Server) dispatchMessage(msg *message) {
default:
log.Warn("Server", zap.String("not support message error action", action.String()))
}
case MessageTypeCross:
serverId, queue, packet := msg.t.deconstructCross(msg.attrs...)
slf.OnReceiveCrossPacketEvent(serverId, queue, packet)
default:
log.Warn("Server", zap.String("not support message type", msg.t.String()))
}