diff --git a/server/cross/message.go b/server/cross/message.go index 60c4c09..337c225 100644 --- a/server/cross/message.go +++ b/server/cross/message.go @@ -1,5 +1,6 @@ package cross +// Message 跨服消息数据结构 type Message struct { ServerId int64 `json:"server_id"` Packet []byte `json:"packet"` diff --git a/server/cross/nats.go b/server/cross/nats.go index d27c80c..86ff5e4 100644 --- a/server/cross/nats.go +++ b/server/cross/nats.go @@ -14,6 +14,7 @@ const ( nasMark = "Cross.Nats" ) +// NewNats 创建一个基于 Nats 实现的跨服消息功能组件 func NewNats(url string, options ...NatsOption) *Nats { n := &Nats{ url: url, @@ -31,6 +32,7 @@ func NewNats(url string, options ...NatsOption) *Nats { return n } +// Nats 基于 Nats 实现的跨服消息功能组件 type Nats struct { conn *nats.Conn url string diff --git a/server/gateway/doc.go b/server/gateway/doc.go new file mode 100644 index 0000000..580630c --- /dev/null +++ b/server/gateway/doc.go @@ -0,0 +1,2 @@ +// Package gateway 是用于处理服务器消息的网关模块,适用于对客户端消息进行处理、转发的情况。 +package gateway diff --git a/server/gateway/endpoint.go b/server/gateway/endpoint.go index a93d8d0..a762bb2 100644 --- a/server/gateway/endpoint.go +++ b/server/gateway/endpoint.go @@ -34,6 +34,14 @@ func NewEndpoint(name string, cli *client.Client, options ...EndpointOption) *En } // Endpoint 网关端点 +// - 每一个端点均表示了一个目标服务,网关会将数据包转发到该端点,由该端点负责将数据包转发到目标服务。 +// - 每个端点会建立一个连接池,默认大小为 DefaultEndpointConnectionPoolSize,可通过 WithEndpointConnectionPoolSize 进行设置。 +// - 网关在转发数据包时会自行根据延迟维护端点健康值,端点健康值越高,网关越倾向于将数据包转发到该端点。 +// - 端点支持连接未中断前始终将数据包转发到特定端点,这样可以保证连接的状态维持。 +// +// 连接池: +// - 连接池大小决定了网关服务器与端点服务器建立的连接数,例如当连接池大小为 1 时,那么所有连接到该端点的客户端都会共用一个连接。 +// - 连接池的设计可以突破单机理论 65535 个 WebSocket 客户端的限制,适当的增大连接池大小可以提高网关服务器的承载能力。 type Endpoint struct { gateway *Gateway client []*client.Client // 端点客户端 @@ -46,8 +54,8 @@ type Endpoint struct { cps int // 端点连接池大小 } -// start 开始与端点建立连接 -func (slf *Endpoint) start(gateway *Gateway, cli *client.Client) { +// start 开始与目标服务端点建立连接 +func (slf *Endpoint) start(cli *client.Client) { for { cur := time.Now().UnixNano() if err := cli.Run(); err == nil { @@ -76,7 +84,7 @@ func (slf *Endpoint) connect(gateway *Gateway) { }) cli.RegConnectionClosedEvent(func(conn *client.Client, err any) { slf.gateway.OnEndpointConnectClosedEvent(slf.gateway, slf) - slf.start(gateway, cli) + slf.start(cli) }) cli.RegConnectionReceivePacketEvent(func(conn *client.Client, wst int, packet []byte) { addr, sendTime, packet, err := UnmarshalGatewayInPacket(packet) @@ -93,7 +101,7 @@ func (slf *Endpoint) connect(gateway *Gateway) { c.SetWST(wst) slf.gateway.OnEndpointConnectReceivePacketEvent(slf.gateway, slf, c, packet) }) - slf.start(gateway, cli) + slf.start(cli) leastOnce.Do(least.Done) }(cli) }