vRp.CD2g_test/server/conn.go

372 lines
9.1 KiB
Go

package server
import (
"context"
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/kercylan98/minotaur/server/writeloop"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/hash"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/random"
"github.com/kercylan98/minotaur/utils/timer"
"github.com/panjf2000/gnet"
"github.com/xtaci/kcp-go/v5"
"io"
"net"
"net/http"
"os"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"
)
var wsRequestKey = fmt.Sprintf("WS:REQ:%s", strings.ToUpper(random.HostName()))
// newKcpConn 创建一个处理KCP的连接
func newKcpConn(server *Server, session *kcp.UDPSession) *Conn {
c := &Conn{
ctx: server.ctx,
connection: &connection{
server: server,
remoteAddr: session.RemoteAddr(),
ip: session.RemoteAddr().String(),
kcp: session,
data: map[any]any{},
openTime: time.Now(),
},
}
if index := strings.LastIndex(c.ip, ":"); index != -1 {
c.ip = c.ip[0:index]
}
c.init()
return c
}
// newKcpConn 创建一个处理GNet的连接
func newGNetConn(server *Server, conn gnet.Conn) *Conn {
c := &Conn{
ctx: server.ctx,
connection: &connection{
server: server,
remoteAddr: conn.RemoteAddr(),
ip: conn.RemoteAddr().String(),
gn: conn,
data: map[any]any{},
openTime: time.Now(),
},
}
if index := strings.LastIndex(c.ip, ":"); index != -1 {
c.ip = c.ip[0:index]
}
c.init()
return c
}
// newKcpConn 创建一个处理WebSocket的连接
func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn {
c := &Conn{
ctx: server.ctx,
connection: &connection{
server: server,
remoteAddr: ws.RemoteAddr(),
ip: ip,
ws: ws,
data: map[any]any{},
openTime: time.Now(),
},
}
c.init()
return c
}
// newBotConn 创建一个适用于测试等情况的机器人连接
func newBotConn(server *Server) *Conn {
ip, port := random.NetIP(), random.Port()
var writer io.Writer = os.Stdout
c := &Conn{
ctx: server.ctx,
connection: &connection{
server: server,
remoteAddr: &net.TCPAddr{
IP: ip,
Port: port,
Zone: "",
},
ip: fmt.Sprintf("BOT:%s:%d", ip.String(), port),
data: map[any]any{},
openTime: time.Now(),
},
}
c.botWriter.Store(&writer)
c.init()
return c
}
// Conn 服务器连接单次消息的包装
type Conn struct {
*connection
wst int
ctx context.Context
}
// connection 长久保持的连接
type connection struct {
server *Server
ticker *timer.Ticker
remoteAddr net.Addr
ip string
ws *websocket.Conn
gn gnet.Conn
kcp *kcp.UDPSession
gw func(packet []byte)
data map[any]any
closed bool
pool *concurrent.Pool[*connPacket]
loop *writeloop.WriteLoop[*connPacket]
mu sync.Mutex
openTime time.Time
delay time.Duration
fluctuation time.Duration
botWriter atomic.Pointer[io.Writer]
}
// Ticker 获取定时器
func (slf *Conn) Ticker() *timer.Ticker {
return slf.ticker
}
// GetServer 获取服务器
func (slf *Conn) GetServer() *Server {
return slf.server
}
// GetOpenTime 获取连接打开时间
func (slf *Conn) GetOpenTime() time.Time {
return slf.openTime
}
// GetOnlineTime 获取连接在线时长
func (slf *Conn) GetOnlineTime() time.Duration {
return time.Now().Sub(slf.openTime)
}
// GetWebsocketRequest 获取websocket请求
func (slf *Conn) GetWebsocketRequest() *http.Request {
return slf.GetData(wsRequestKey).(*http.Request)
}
// IsBot 是否是机器人连接
func (slf *Conn) IsBot() bool {
return slf != nil && slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil
}
// RemoteAddr 获取远程地址
func (slf *Conn) RemoteAddr() net.Addr {
return slf.remoteAddr
}
// GetID 获取连接ID
// - 为远程地址的字符串形式
func (slf *Conn) GetID() string {
if slf.IsBot() {
return slf.ip
}
return slf.remoteAddr.String()
}
// GetIP 获取连接IP
func (slf *Conn) GetIP() string {
return slf.ip
}
// IsClosed 是否已经关闭
func (slf *Conn) IsClosed() bool {
slf.mu.Lock()
defer slf.mu.Unlock()
return slf.closed
}
// SetData 设置连接数据,该数据将在连接关闭前始终存在
func (slf *Conn) SetData(key, value any) *Conn {
slf.data[key] = value
return slf
}
// GetData 获取连接数据
func (slf *Conn) GetData(key any) any {
return slf.data[key]
}
// ViewData 查看只读的连接数据
func (slf *Conn) ViewData() map[any]any {
return hash.Copy(slf.data)
}
// SetMessageData 设置消息数据,该数据将在消息处理完成后释放
func (slf *Conn) SetMessageData(key, value any) *Conn {
slf.ctx = context.WithValue(slf.ctx, key, value)
return slf
}
// GetMessageData 获取消息数据
func (slf *Conn) GetMessageData(key any) any {
return slf.ctx.Value(key)
}
// ReleaseData 释放数据
func (slf *Conn) ReleaseData() *Conn {
for k := range slf.data {
delete(slf.data, k)
}
return slf
}
// IsWebsocket 是否是websocket连接
func (slf *Conn) IsWebsocket() bool {
return slf.server.network == NetworkWebsocket
}
// GetWST 获取本次 websocket 消息类型
// - 默认将与发送类型相同
func (slf *Conn) GetWST() int {
return slf.wst
}
// SetWST 设置本次 websocket 消息类型
func (slf *Conn) SetWST(wst int) *Conn {
slf.wst = wst
return slf
}
// PushAsyncMessage 推送异步消息,该消息将通过 Server.PushShuntAsyncMessage 函数推送
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Conn) PushAsyncMessage(caller func() error, callback func(err error), mark ...log.Field) {
slf.server.PushShuntAsyncMessage(slf, caller, callback, mark...)
}
// PushUniqueAsyncMessage 推送唯一异步消息,该消息将通过 Server.PushUniqueShuntAsyncMessage 函数推送
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息
func (slf *Conn) PushUniqueAsyncMessage(name string, caller func() error, callback func(err error), mark ...log.Field) {
slf.server.PushUniqueShuntAsyncMessage(slf, name, caller, callback, mark...)
}
// Write 向连接中写入数据
func (slf *Conn) Write(packet []byte, callback ...func(err error)) {
if slf.gw != nil {
slf.gw(packet)
return
}
packet = slf.server.OnConnectionWritePacketBeforeEvent(slf, packet)
slf.mu.Lock()
defer slf.mu.Unlock()
if slf.closed {
return
}
cp := slf.pool.Get()
cp.wst = slf.GetWST()
cp.packet = packet
if len(callback) > 0 {
cp.callback = callback[0]
}
slf.loop.Put(cp)
}
func (slf *Conn) init() {
if slf.server.ticker != nil && slf.server.connTickerSize > 0 {
if slf.server.tickerAutonomy {
slf.ticker = slf.server.tickerPool.GetTicker(slf.server.connTickerSize)
} else {
slf.ticker = slf.server.tickerPool.GetTicker(slf.server.connTickerSize, timer.WithCaller(func(name string, caller func()) {
slf.server.PushShuntTickerMessage(slf, name, caller)
}))
}
}
slf.pool = concurrent.NewPool[*connPacket](10*1024,
func() *connPacket {
return &connPacket{}
}, func(data *connPacket) {
data.wst = 0
data.packet = nil
data.callback = nil
},
)
slf.loop = writeloop.NewWriteLoop[*connPacket](slf.pool, func(data *connPacket) error {
if slf.server.runtime.packetWarnSize > 0 && len(data.packet) > slf.server.runtime.packetWarnSize {
log.Warn("Conn.Write", log.String("State", "PacketWarn"), log.String("Reason", "PacketSize"), log.String("ID", slf.GetID()), log.Int("PacketSize", len(data.packet)))
}
var err error
if slf.delay > 0 || slf.fluctuation > 0 {
time.Sleep(random.Duration(int64(slf.delay-slf.fluctuation), int64(slf.delay+slf.fluctuation)))
_, err = (*slf.botWriter.Load()).Write(data.packet)
if data.callback != nil {
data.callback(err)
}
return err
}
if slf.IsWebsocket() {
err = slf.ws.WriteMessage(data.wst, data.packet)
} else {
if slf.gn != nil {
switch slf.server.network {
case NetworkUdp, NetworkUdp4, NetworkUdp6:
err = slf.gn.SendTo(data.packet)
default:
err = slf.gn.AsyncWrite(data.packet)
}
} else if slf.kcp != nil {
_, err = slf.kcp.Write(data.packet)
}
}
if data.callback != nil {
data.callback(err)
}
return err
}, func(err any) {
slf.Close(errors.New(fmt.Sprint(err)))
})
}
// Close 关闭连接
func (slf *Conn) Close(err ...error) {
slf.mu.Lock()
if slf.closed {
if slf.ticker != nil {
slf.ticker.Release()
}
slf.mu.Unlock()
return
}
defer func() {
if err := recover(); err != nil {
log.Error("Conn.Close", log.String("State", "Panic"), log.Any("Error", err))
debug.PrintStack()
slf.mu.Unlock()
}
}()
slf.closed = true
if slf.ws != nil {
_ = slf.ws.Close()
} else if slf.gn != nil {
_ = slf.gn.Close()
} else if slf.kcp != nil {
_ = slf.kcp.Close()
}
if slf.ticker != nil {
slf.ticker.Release()
}
slf.server.releaseDispatcher(slf)
slf.pool.Close()
slf.loop.Close()
slf.mu.Unlock()
if len(err) > 0 {
slf.server.OnConnectionClosedEvent(slf, err[0])
return
}
slf.server.OnConnectionClosedEvent(slf, nil)
}