From 9b7fd2b4cf351c91c951062a18d2cecfe4e03640 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Mon, 15 May 2023 15:37:50 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B7=A8=E6=9C=8D=E5=8A=9F=E8=83=BD=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 9 +++++ go.sum | 19 ++++++++++ server/cross.go | 80 ++++++++++++++++++++++++++++++++++++++--- server/cross_message.go | 8 +++++ server/cross_queue.go | 15 ++++++++ server/errors.go | 3 ++ server/event.go | 24 +++++++++++-- server/message.go | 22 ++++++++++++ server/options.go | 19 ++++++++++ server/server.go | 15 +++++++- 10 files changed, 206 insertions(+), 8 deletions(-) create mode 100644 server/cross_message.go create mode 100644 server/cross_queue.go diff --git a/go.mod b/go.mod index 6005c88..db9aa93 100644 --- a/go.mod +++ b/go.mod @@ -29,15 +29,22 @@ require ( github.com/jonboulle/clockwork v0.3.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect + github.com/klauspost/compress v1.16.4 // indirect github.com/klauspost/cpuid/v2 v2.2.2 // indirect github.com/klauspost/reedsolomon v1.11.7 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/mattn/go-isatty v0.0.17 // indirect + github.com/minio/highwayhash v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect + github.com/nats-io/jwt/v2 v2.4.1 // indirect + github.com/nats-io/nats-server/v2 v2.9.16 // indirect + github.com/nats-io/nats.go v1.25.0 // indirect + github.com/nats-io/nkeys v0.4.4 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/richardlehane/mscfb v1.0.4 // indirect @@ -53,12 +60,14 @@ require ( github.com/xuri/excelize/v2 v2.7.1 // indirect github.com/xuri/nfp v0.0.0-20220409054826-5e722a1d9e22 // indirect go.uber.org/atomic v1.10.0 // indirect + go.uber.org/automaxprocs v1.5.1 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect golang.org/x/crypto v0.8.0 // indirect golang.org/x/net v0.9.0 // indirect golang.org/x/sys v0.7.0 // indirect golang.org/x/text v0.9.0 // indirect + golang.org/x/time v0.3.0 // indirect google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect diff --git a/go.sum b/go.sum index a10d244..5b81de5 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU= +github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid v1.2.4/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -93,6 +95,8 @@ github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205Ah github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mmcloughlin/avo v0.0.0-20200803215136-443f81d77104/go.mod h1:wqKykBG2QzQDJEzvRkcS8x6MiSJkF52hXZsXcjaB3ls= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -100,6 +104,16 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw= github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8= +github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= +github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= +github.com/nats-io/nats-server/v2 v2.9.16 h1:SuNe6AyCcVy0g5326wtyU8TdqYmcPqzTjhkHojAjprc= +github.com/nats-io/nats-server/v2 v2.9.16/go.mod h1:z1cc5Q+kqJkz9mLUdlcSsdYnId4pyImHjNgoh6zxSC0= +github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE= +github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg= +github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= +github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/panjf2000/ants/v2 v2.4.7 h1:MZnw2JRyTJxFwtaMtUJcwE618wKD04POWk2gwwP4E2M= github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= github.com/panjf2000/gnet v1.6.6 h1:P6bApc54hnVcJVgH+SMe41mn47ECCajB6E/dKq27Y0c= @@ -170,6 +184,8 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk= +go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= @@ -233,6 +249,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -264,6 +281,8 @@ golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/server/cross.go b/server/cross.go index 6e83318..cc8ab9f 100644 --- a/server/cross.go +++ b/server/cross.go @@ -1,9 +1,79 @@ package server -// Cross 跨服功能接口实现 -type Cross interface { - // PushPacket 推送数据包 - PushPacket(serverId int64, packet []byte) error +import ( + "encoding/json" + "github.com/kercylan98/minotaur/utils/log" + "github.com/kercylan98/minotaur/utils/synchronization" + "go.uber.org/zap" +) - // +// cross 跨服功能 +type cross struct { + server *Server + messageChannel chan *crossMessage + messagePool *synchronization.Pool[*crossMessage] + queues map[CrossQueueName]CrossQueue +} + +func (slf *cross) Run(server *Server, queues ...CrossQueue) error { + slf.server = server + slf.queues = map[CrossQueueName]CrossQueue{} + slf.messagePool = synchronization.NewPool[*crossMessage](100, + func() *crossMessage { + return &crossMessage{} + }, func(data *crossMessage) { + data.toServerId = 0 + data.ServerId = 0 + data.Queue = "" + data.Packet = nil + }, + ) + slf.messageChannel = make(chan *crossMessage, 4096*100) + for i := 0; i < len(slf.queues); i++ { + queue := queues[i] + if _, exist := slf.queues[queue.GetName()]; exist { + return ErrCrossDuplicateQueue + } + if err := queue.Init(); err != nil { + return err + } + slf.queues[queue.GetName()] = queue + queue.Subscribe(slf.server.GetID(), func(bytes []byte) { + message := slf.messagePool.Get() + if err := json.Unmarshal(bytes, message); err != nil { + log.Error("Cross", zap.String("Queue.Receive", string(queue.GetName())), zap.String("Packet", string(bytes)), zap.Error(err)) + return + } + slf.server.PushMessage(MessageTypeCross, message.ServerId, message.Queue, message.Packet) + slf.messagePool.Release(message) + }) + } + go func() { + for message := range slf.messageChannel { + queue := slf.queues[message.Queue] + data, err := json.Marshal(message) + if err != nil { + log.Error("Cross", zap.String("Queue.Push", string(queue.GetName())), zap.String("Packet", string(message.Packet)), zap.Error(err)) + } else if err = queue.Publish(message.toServerId, data); err != nil { + log.Error("Cross", zap.String("Queue.Push", string(queue.GetName())), zap.Error(err)) + } + slf.messagePool.Release(message) + } + }() + return nil +} + +func (slf *cross) PushCrossMessage(queue CrossQueueName, serverId int64, packet []byte) { + message := slf.messagePool.Get() + message.toServerId = serverId + message.ServerId = slf.server.GetID() + message.Queue = queue + message.Packet = packet + slf.messageChannel <- message +} + +func (slf *cross) shutdownCross() { + close(slf.messageChannel) + slf.messagePool.Close() + slf.messagePool = nil } diff --git a/server/cross_message.go b/server/cross_message.go new file mode 100644 index 0000000..f80208f --- /dev/null +++ b/server/cross_message.go @@ -0,0 +1,8 @@ +package server + +type crossMessage struct { + toServerId int64 + ServerId int64 `json:"server_id"` + Queue CrossQueueName `json:"queue"` + Packet []byte `json:"packet"` +} diff --git a/server/cross_queue.go b/server/cross_queue.go new file mode 100644 index 0000000..1bc6daa --- /dev/null +++ b/server/cross_queue.go @@ -0,0 +1,15 @@ +package server + +type CrossQueueName string + +// CrossQueue 跨服消息队列接口 +type CrossQueue interface { + // GetName 获取跨服消息队列名称 + GetName() CrossQueueName + // Init 初始化队列 + Init() error + // Publish 发布跨服消息 + Publish(serverId int64, packet []byte) error + // Subscribe 接收到跨服消息 + Subscribe(serverId int64, packetHandle func([]byte)) +} diff --git a/server/errors.go b/server/errors.go index cf1da3b..1c4637c 100644 --- a/server/errors.go +++ b/server/errors.go @@ -8,6 +8,7 @@ var ( ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte") ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)") ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction") + ErrMessageTypeCrossErrorAttrs = errors.New("MessageTypeCross must contain int64(server id) and CrossQueueName and []byte") ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported") ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported") ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") @@ -17,4 +18,6 @@ var ( ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register") ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register") ErrOnlySupportSocket = errors.New("only supports Socket programming") + ErrNoSupportGetID = errors.New("the server does not support GetID, please use the WithCross option to create the server") + ErrCrossDuplicateQueue = errors.New("cross duplicate registration Queue") ) diff --git a/server/event.go b/server/event.go index 138cac5..5904698 100644 --- a/server/event.go +++ b/server/event.go @@ -13,6 +13,7 @@ type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []b type ConnectionReceiveWebsocketPacketEventHandle func(srv *Server, conn *Conn, packet []byte, messageType int) type ConnectionOpenedEventHandle func(srv *Server, conn *Conn) type ConnectionClosedEventHandle func(srv *Server, conn *Conn) +type ReceiveCrossPacketEventHandle func(srv *Server, senderServerId int64, packet []byte) type event struct { *Server @@ -22,6 +23,7 @@ type event struct { connectionReceiveWebsocketPacketEventHandles []ConnectionReceiveWebsocketPacketEventHandle connectionOpenedEventHandles []ConnectionOpenedEventHandle connectionClosedEventHandles []ConnectionClosedEventHandle + receiveCrossPacketEventHandles map[CrossQueueName][]ReceiveCrossPacketEventHandle } // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 @@ -117,6 +119,20 @@ func (slf *event) OnConnectionReceiveWebsocketPacketEvent(conn *Conn, packet []b } } +// RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数 +func (slf *event) RegReceiveCrossPacketEvent(queue CrossQueueName, handle ReceiveCrossPacketEventHandle) { + if slf.receiveCrossPacketEventHandles == nil { + slf.receiveCrossPacketEventHandles = map[CrossQueueName][]ReceiveCrossPacketEventHandle{} + } + slf.receiveCrossPacketEventHandles[queue] = append(slf.receiveCrossPacketEventHandles[queue], handle) +} + +func (slf *event) OnReceiveCrossPacketEvent(serverId int64, queue CrossQueueName, packet []byte) { + for _, handle := range slf.receiveCrossPacketEventHandles[queue] { + handle(slf.Server, serverId, packet) + } +} + func (slf *event) check() { switch slf.network { case NetworkHttp, NetworkGRPC: @@ -124,13 +140,17 @@ func (slf *event) check() { switch slf.network { case NetworkWebsocket: if len(slf.connectionReceiveWebsocketPacketEventHandles) == 0 { - log.Warn("Server", zap.String("ConnectionReceiveWebsocketPacketEvent", "Invalid server, no packets processed")) + log.Warn("Server", zap.String("ConnectionReceiveWebsocketPacketEvent", "invalid server, no packets processed")) } default: if len(slf.connectionReceivePacketEventHandles) == 0 { - log.Warn("Server", zap.String("ConnectionReceivePacketEvent", "Invalid server, no packets processed")) + log.Warn("Server", zap.String("ConnectionReceivePacketEvent", "invalid server, no packets processed")) } } } + if len(slf.receiveCrossPacketEventHandles) > 0 && slf.id == nil { + log.Warn("Server", zap.String("ReceiveCrossPacketEvent", "invalid server, not register cross server")) + } + } diff --git a/server/message.go b/server/message.go index d9da968..fa59818 100644 --- a/server/message.go +++ b/server/message.go @@ -10,6 +10,11 @@ const ( // - error // - server.MessageErrorAction MessageTypeError + + // MessageTypeCross 跨服消息类型:该类型的数据将被发送到对应服务器中进行处理 + // - int64(sender serverId) + // - []byte + MessageTypeCross ) var messageNames = map[MessageType]string{ @@ -92,3 +97,20 @@ func (slf MessageType) deconstructError(attrs ...any) (err error, action Message } return } + +func (slf MessageType) deconstructCross(attrs ...any) (serverId int64, queue CrossQueueName, packet []byte) { + if len(attrs) != 3 { + panic(ErrMessageTypeCrossErrorAttrs) + } + var ok bool + if serverId, ok = attrs[0].(int64); !ok { + panic(ErrMessageTypeCrossErrorAttrs) + } + if queue, ok = attrs[0].(CrossQueueName); !ok { + panic(ErrMessageTypeCrossErrorAttrs) + } + if packet, ok = attrs[1].([]byte); !ok { + panic(ErrMessageTypeCrossErrorAttrs) + } + return +} diff --git a/server/options.go b/server/options.go index 88a9360..b044646 100644 --- a/server/options.go +++ b/server/options.go @@ -3,8 +3,10 @@ package server import ( "github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/log" + "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" + "runtime/debug" ) const ( @@ -21,6 +23,23 @@ const ( ) type Option func(srv *Server) +type CrossRegisterHandle func(server *Server) error + +// WithCross 通过跨服的方式创建服务器 +// - CrossQueue: 跨服队列是用于接收和发送跨服消息的队列接口 +func WithCross(serverId int64, queues ...CrossQueue) Option { + return func(srv *Server) { + srv.id = &serverId + srv.RegStartFinishEvent(func(srv *Server) { + srv.cross = new(cross) + if err := srv.cross.Run(srv, queues...); err != nil { + srv.PushMessage(MessageTypeError, errors.WithMessage(err, string(debug.Stack())), MessageErrorActionShutdown) + return + } + log.Info("Server", zap.Int64("CrossID", serverId)) + }) + } +} // WithConnectPacketDiversion 通过连接数据包消息分流的方式创建服务器 // - 连接消息分流后数据包消息将会从其他消息类型中独立出来,并且由多个消息管道及协程进行处理 diff --git a/server/server.go b/server/server.go index f8cc3dc..60a6eb8 100644 --- a/server/server.go +++ b/server/server.go @@ -53,6 +53,8 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { *event + *cross + id *int64 // 服务器id network Network // 网络类型 addr string // 侦听地址 options []Option // 选项 @@ -99,7 +101,7 @@ func (slf *Server) Run(addr string) error { var connectionInitHandle = func(callback func()) { slf.initMessageChannel = true if slf.messagePoolSize <= 0 { - slf.messagePoolSize = 4096 * 1024 + slf.messagePoolSize = 100 } slf.messagePool = synchronization.NewPool[*message](slf.messagePoolSize, func() *message { @@ -322,6 +324,14 @@ func (slf *Server) IsDev() bool { return !slf.prod } +// GetID 获取服务器id +func (slf *Server) GetID() int64 { + if slf.id == nil { + panic(ErrNoSupportGetID) + } + return *slf.id +} + // Shutdown 停止运行服务器 func (slf *Server) Shutdown(err error) { slf.isShutdown.Store(true) @@ -421,6 +431,9 @@ func (slf *Server) dispatchMessage(msg *message) { default: log.Warn("Server", zap.String("not support message error action", action.String())) } + case MessageTypeCross: + serverId, queue, packet := msg.t.deconstructCross(msg.attrs...) + slf.OnReceiveCrossPacketEvent(serverId, queue, packet) default: log.Warn("Server", zap.String("not support message type", msg.t.String())) }