other: 新版 server 包完善

This commit is contained in:
kercylan98 2024-04-01 12:33:05 +08:00
parent 89e868bd1c
commit ffc3421b29
13 changed files with 407 additions and 40 deletions

View File

@ -21,6 +21,9 @@ type Conn interface {
// WriteBytes 写入数据
WriteBytes(data []byte) error
// WriteContext 写入数据
WriteContext(data []byte, context interface{}) error
}
func newConn(c net.Conn, connWriter ConnWriter) *conn {
@ -31,9 +34,9 @@ func newConn(c net.Conn, connWriter ConnWriter) *conn {
}
type conn struct {
conn net.Conn
writer ConnWriter
actor string
conn net.Conn // 连接
writer ConnWriter // 写入器
actor string // Actor 名称
}
func (c *conn) SetActor(actor string) {
@ -56,3 +59,7 @@ func (c *conn) WriteBytes(data []byte) error {
_, err := c.conn.Write(data)
return err
}
func (c *conn) WriteContext(data []byte, context interface{}) error {
return c.writer(NewPacket(data).SetContext(context))
}

View File

@ -20,33 +20,35 @@ func (s *controller) init(srv *server) *controller {
}
func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) {
if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) {
srv.connections[conn] = newConn(conn, writer)
if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
c := newConn(conn, writer)
srv.connections[conn] = c
s.events.onConnectionOpened(c)
})); err != nil {
panic(err)
}
}
func (s *controller) EliminateConnection(conn net.Conn, err error) {
if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) {
if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
c, exist := srv.connections[conn]
if !exist {
return
}
delete(srv.connections, conn)
srv.events.onConnectionClosed(c, err)
})); err != nil {
panic(err)
}
}
func (s *controller) ReactPacket(conn net.Conn, packet Packet) {
if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) {
if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
c, exist := srv.connections[conn]
if !exist {
return
}
if err := srv.reactor.Dispatch(c.GetActor(), HandlerMessage(srv, func(srv *server) {
srv.events.onConnectionReceivePacket(c, packet)
})); err != nil {
panic(err)
}
srv.events.onConnectionReceivePacket(c, packet)
})); err != nil {
panic(err)
}

View File

@ -1,19 +1,39 @@
package server
import "github.com/kercylan98/minotaur/utils/collection/listings"
import (
"github.com/kercylan98/minotaur/utils/collection/listings"
"time"
)
type (
LaunchedEventHandler func(srv Server, ip string, t time.Time)
ConnectionOpenedEventHandler func(srv Server, conn Conn)
ConnectionClosedEventHandler func(srv Server, conn Conn, err error)
ConnectionReceivePacketEventHandler func(srv Server, conn Conn, packet Packet)
)
type Events interface {
// RegisterLaunchedEvent 注册服务器启动事件,当服务器启动后将会触发该事件
// - 该事件将在系统级 Actor 中运行,该事件中阻塞会导致服务器启动延迟
RegisterLaunchedEvent(handler LaunchedEventHandler, priority ...int)
// RegisterConnectionOpenedEvent 注册连接打开事件,当新连接创建完毕时将会触发该事件
// - 该事件将在系统级 Actor 中运行,不应执行阻塞操作
RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int)
// RegisterConnectionClosedEvent 注册连接关闭事件,当连接关闭后将会触发该事件
// - 该事件将在系统级 Actor 中运行,不应执行阻塞操作
RegisterConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int)
// RegisterConnectionReceivePacketEvent 注册连接接收数据包事件,当连接接收到数据包后将会触发该事件
// - 该事件将在连接的 Actor 中运行,不应执行阻塞操作
RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int)
}
type events struct {
*server
connectionReceivePacketEventHandlers listings.PrioritySlice[ConnectionReceivePacketEventHandler]
launchedEventHandlers listings.SyncPrioritySlice[LaunchedEventHandler]
connectionOpenedEventHandlers listings.SyncPrioritySlice[ConnectionOpenedEventHandler]
connectionClosedEventHandlers listings.SyncPrioritySlice[ConnectionClosedEventHandler]
connectionReceivePacketEventHandlers listings.SyncPrioritySlice[ConnectionReceivePacketEventHandler]
}
func (s *events) init(srv *server) *events {
@ -21,13 +41,54 @@ func (s *events) init(srv *server) *events {
return s
}
func (s *events) RegisterLaunchedEvent(handler LaunchedEventHandler, priority ...int) {
s.launchedEventHandlers.AppendByOptionalPriority(handler, priority...)
}
func (s *events) onLaunched() {
_ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
s.launchedEventHandlers.RangeValue(func(index int, value LaunchedEventHandler) bool {
value(s.server, s.server.state.Ip, s.server.state.LaunchedAt)
return true
})
}))
}
func (s *events) RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int) {
s.connectionOpenedEventHandlers.AppendByOptionalPriority(handler, priority...)
}
func (s *events) onConnectionOpened(conn Conn) {
_ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
s.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool {
value(s.server, conn)
return true
})
}))
}
func (s *events) RegisterConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int) {
s.connectionClosedEventHandlers.AppendByOptionalPriority(handler, priority...)
}
func (s *events) onConnectionClosed(conn Conn, err error) {
_ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
s.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool {
value(s.server, conn, err)
return true
})
}))
}
func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) {
s.connectionReceivePacketEventHandlers.AppendByOptionalPriority(handler, priority...)
}
func (s *events) onConnectionReceivePacket(conn Conn, packet Packet) {
s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool {
value(s.server, conn, packet)
return true
})
_ = s.server.reactor.AutoDispatch(conn.GetActor(), NativeMessage(s.server, func(srv *server) {
s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool {
value(s.server, conn, packet)
return true
})
}))
}

View File

@ -4,15 +4,15 @@ type Message interface {
Execute()
}
func HandlerMessage(srv *server, handler func(srv *server)) Message {
return &handlerMessage{srv: srv, handler: handler}
func NativeMessage(srv *server, handler func(srv *server)) Message {
return &nativeMessage{srv: srv, handler: handler}
}
type handlerMessage struct {
type nativeMessage struct {
srv *server
handler func(srv *server)
}
func (s *handlerMessage) Execute() {
func (s *nativeMessage) Execute() {
s.handler(s.srv)
}

View File

@ -5,7 +5,6 @@ import (
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/utils/log"
"github.com/panjf2000/gnet/v2"
"time"
)
@ -35,9 +34,6 @@ func (w *websocketHandler) OnShutdown(eng gnet.Engine) {
func (w *websocketHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
wrapper := newWebsocketWrapper(c)
c.SetContext(wrapper)
w.controller.RegisterConnection(c, func(packet server.Packet) error {
return wsutil.WriteServerMessage(c, packet.GetContext().(ws.OpCode), packet.GetBytes())
})
return
}
@ -49,15 +45,16 @@ func (w *websocketHandler) OnClose(c gnet.Conn, err error) (action gnet.Action)
func (w *websocketHandler) OnTraffic(c gnet.Conn) (action gnet.Action) {
wrapper := c.Context().(*websocketWrapper)
// read to buffer
if err := wrapper.readToBuffer(); err != nil {
log.Error("websocket", log.Err(err))
return gnet.Close
}
// check or upgrade
if err := wrapper.upgrade(w.upgrader); err != nil {
log.Error("websocket", log.Err(err))
if err := wrapper.upgrade(w.upgrader, func() {
// 协议升级成功后视为连接建立
w.controller.RegisterConnection(c, func(packet server.Packet) error {
return wsutil.WriteServerMessage(c, packet.GetContext().(ws.OpCode), packet.GetBytes())
})
}); err != nil {
return gnet.Close
}
wrapper.active = time.Now()
@ -65,7 +62,7 @@ func (w *websocketHandler) OnTraffic(c gnet.Conn) (action gnet.Action) {
// decode
messages, err := wrapper.decode()
if err != nil {
log.Error("websocket", log.Err(err))
return gnet.Close
}
for _, message := range messages {
@ -73,7 +70,6 @@ func (w *websocketHandler) OnTraffic(c gnet.Conn) (action gnet.Action) {
packet.SetContext(message.OpCode)
w.controller.ReactPacket(c, packet)
}
return
}

View File

@ -50,7 +50,7 @@ func (w *websocketWrapper) readToBuffer() error {
}
// upgrade 升级
func (w *websocketWrapper) upgrade(upgrader ws.Upgrader) (err error) {
func (w *websocketWrapper) upgrade(upgrader ws.Upgrader, upgradedHandler func()) (err error) {
if w.upgraded {
return
}
@ -73,6 +73,7 @@ func (w *websocketWrapper) upgrade(upgrader ws.Upgrader) (err error) {
}
buf.Next(skip)
w.upgraded = true
upgradedHandler()
return
}

View File

@ -1,4 +0,0 @@
package reactor
type Event struct {
}

View File

@ -106,6 +106,14 @@ func (r *Reactor[M]) SetDebug(debug bool) *Reactor[M] {
return r
}
// AutoDispatch 自动分发,当 ident 为空字符串时,分发到系统级别的队列,否则分发到 ident 使用的队列
func (r *Reactor[M]) AutoDispatch(ident string, msg M) error {
if ident == "" {
return r.SystemDispatch(msg)
}
return r.Dispatch(ident, msg)
}
// SystemDispatch 将消息分发到系统级别的队列
func (r *Reactor[M]) SystemDispatch(msg M) error {
if atomic.LoadInt32(&r.state) > StatusRunning {

View File

@ -2,19 +2,31 @@ package server
import (
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
"github.com/kercylan98/minotaur/utils/network"
"golang.org/x/net/context"
"os"
"os/signal"
"syscall"
"time"
)
type Server interface {
Events
// Run 运行服务器
Run() error
// Shutdown 关闭服务器
Shutdown() error
// GetStatus 获取服务器状态
GetStatus() *State
}
type server struct {
*controller
*events
state *State
ctx context.Context
cancel context.CancelFunc
network Network
@ -31,6 +43,7 @@ func NewServer(network Network) Server {
srv.reactor = reactor.NewReactor[Message](1024*8, 1024, func(msg Message) {
msg.Execute()
}, nil)
srv.state = new(State).init(srv)
return srv
}
@ -40,7 +53,18 @@ func (s *server) Run() (err error) {
return
}
if err = s.network.OnRun(); err != nil {
ip, _ := network.IP()
s.state.onLaunched(ip.String(), time.Now())
go func(s *server) {
if err = s.network.OnRun(); err != nil {
panic(err)
}
}(s)
var systemSignal = make(chan os.Signal)
signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
<-systemSignal
if err := s.Shutdown(); err != nil {
panic(err)
}
return
@ -52,3 +76,7 @@ func (s *server) Shutdown() (err error) {
s.reactor.Close()
return
}
func (s *server) GetStatus() *State {
return s.state.Status()
}

View File

@ -1,18 +1,32 @@
package server_test
import (
"github.com/gobwas/ws"
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/server/internal/v2/network"
"testing"
"time"
)
func TestNewServer(t *testing.T) {
srv := server.NewServer(network.WebSocket(":9999"))
srv.RegisterLaunchedEvent(func(srv server.Server, ip string, launchedAt time.Time) {
t.Log("launched", ip, launchedAt)
})
srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) {
if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil {
t.Error(err)
}
})
srv.RegisterConnectionReceivePacketEvent(func(srv server.Server, conn server.Conn, packet server.Packet) {
if err := conn.WritePacket(packet); err != nil {
panic(err)
}
})
if err := srv.Run(); err != nil {
panic(err)
}

View File

@ -0,0 +1,27 @@
package server
import "time"
type State struct {
server *server
LaunchedAt time.Time // 服务器启动时间
Ip string // 服务器 IP 地址
}
func (s *State) init(srv *server) *State {
s.server = srv
return s
}
func (s *State) Status() *State {
return &State{
LaunchedAt: s.LaunchedAt,
Ip: s.Ip,
}
}
func (s *State) onLaunched(ip string, t time.Time) {
s.LaunchedAt = t
s.Ip = ip
s.server.events.onLaunched()
}

View File

@ -0,0 +1,209 @@
package listings
import (
"fmt"
"sort"
"sync"
)
// NewSyncPrioritySlice 创建一个并发安全的优先级切片,优先级越低越靠前
func NewSyncPrioritySlice[V any](lengthAndCap ...int) *SyncPrioritySlice[V] {
p := &SyncPrioritySlice[V]{}
if len(lengthAndCap) > 0 {
var length = lengthAndCap[0]
var c int
if len(lengthAndCap) > 1 {
c = lengthAndCap[1]
}
p.items = make([]*priorityItem[V], length, c)
}
return p
}
// SyncPrioritySlice 是一个优先级切片,优先级越低越靠前
type SyncPrioritySlice[V any] struct {
rw sync.RWMutex
items []*priorityItem[V]
}
// Len 返回切片长度
func (slf *SyncPrioritySlice[V]) Len() int {
slf.rw.RLock()
defer slf.rw.RUnlock()
return len(slf.items)
}
// Cap 返回切片容量
func (slf *SyncPrioritySlice[V]) Cap() int {
slf.rw.RLock()
defer slf.rw.RUnlock()
return cap(slf.items)
}
// Clear 清空切片
func (slf *SyncPrioritySlice[V]) Clear() {
slf.rw.Lock()
defer slf.rw.Unlock()
slf.items = slf.items[:0]
}
// Append 添加元素
func (slf *SyncPrioritySlice[V]) Append(v V, p int) {
slf.rw.Lock()
defer slf.rw.Unlock()
slf.items = append(slf.items, &priorityItem[V]{
v: v,
p: p,
})
slf.sort()
}
// Appends 添加元素
func (slf *SyncPrioritySlice[V]) Appends(priority int, vs ...V) {
for _, v := range vs {
slf.Append(v, priority)
}
slf.sort()
}
// AppendByOptionalPriority 添加元素
func (slf *SyncPrioritySlice[V]) AppendByOptionalPriority(v V, priority ...int) {
if len(priority) == 0 {
slf.Append(v, 0)
} else {
slf.Append(v, priority[0])
}
}
// Get 获取元素
func (slf *SyncPrioritySlice[V]) Get(index int) (V, int) {
slf.rw.RLock()
defer slf.rw.RUnlock()
i := slf.items[index]
return i.Value(), i.Priority()
}
// GetValue 获取元素值
func (slf *SyncPrioritySlice[V]) GetValue(index int) V {
slf.rw.RLock()
defer slf.rw.RUnlock()
return slf.items[index].Value()
}
// GetPriority 获取元素优先级
func (slf *SyncPrioritySlice[V]) GetPriority(index int) int {
slf.rw.RLock()
defer slf.rw.RUnlock()
return slf.items[index].Priority()
}
// Set 设置元素
func (slf *SyncPrioritySlice[V]) Set(index int, value V, priority int) {
slf.rw.Lock()
defer slf.rw.Unlock()
before := slf.items[index]
slf.items[index] = &priorityItem[V]{
v: value,
p: priority,
}
if before.Priority() != priority {
slf.sort()
}
}
// SetValue 设置元素值
func (slf *SyncPrioritySlice[V]) SetValue(index int, value V) {
slf.rw.Lock()
defer slf.rw.Unlock()
slf.items[index].v = value
}
// SetPriority 设置元素优先级
func (slf *SyncPrioritySlice[V]) SetPriority(index int, priority int) {
slf.rw.Lock()
defer slf.rw.Unlock()
slf.items[index].p = priority
slf.sort()
}
// Action 直接操作切片,如果返回值不为 nil则替换切片
func (slf *SyncPrioritySlice[V]) Action(action func(items []*priorityItem[V]) []*priorityItem[V]) {
slf.rw.Lock()
defer slf.rw.Unlock()
if len(slf.items) == 0 {
return
}
if replace := action(slf.items); replace != nil {
slf.items = replace
slf.sort()
}
}
// Range 遍历切片,如果返回值为 false则停止遍历
func (slf *SyncPrioritySlice[V]) Range(action func(index int, item *priorityItem[V]) bool) {
slf.rw.RLock()
defer slf.rw.RUnlock()
for i, item := range slf.items {
if !action(i, item) {
break
}
}
}
// RangeValue 遍历切片值,如果返回值为 false则停止遍历
func (slf *SyncPrioritySlice[V]) RangeValue(action func(index int, value V) bool) {
slf.Range(func(index int, item *priorityItem[V]) bool {
return action(index, item.Value())
})
}
// RangePriority 遍历切片优先级,如果返回值为 false则停止遍历
func (slf *SyncPrioritySlice[V]) RangePriority(action func(index int, priority int) bool) {
slf.Range(func(index int, item *priorityItem[V]) bool {
return action(index, item.Priority())
})
}
// Slice 返回切片
func (slf *SyncPrioritySlice[V]) Slice() []V {
slf.rw.RLock()
defer slf.rw.RUnlock()
var vs []V
for _, item := range slf.items {
vs = append(vs, item.Value())
}
return vs
}
// String 返回切片字符串
func (slf *SyncPrioritySlice[V]) String() string {
slf.rw.RLock()
defer slf.rw.RUnlock()
var vs []V
for _, item := range slf.items {
vs = append(vs, item.Value())
}
return fmt.Sprint(vs)
}
// sort 排序
func (slf *SyncPrioritySlice[V]) sort() {
if len(slf.items) <= 1 {
return
}
sort.Slice(slf.items, func(i, j int) bool {
return slf.items[i].Priority() < slf.items[j].Priority()
})
for i := 0; i < len(slf.items); i++ {
if i == 0 {
slf.items[i].prev = nil
slf.items[i].next = slf.items[i+1]
} else if i == len(slf.items)-1 {
slf.items[i].prev = slf.items[i-1]
slf.items[i].next = nil
} else {
slf.items[i].prev = slf.items[i-1]
slf.items[i].next = slf.items[i+1]
}
}
}

18
utils/super/ip.go Normal file
View File

@ -0,0 +1,18 @@
package super
import (
"net"
)
// IP 返回本机出站地址
func IP() (ip net.IP, err error) {
var conn net.Conn
conn, err = net.Dial("udp", "8.8.8.8:80")
if err != nil {
return
}
_ = conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
ip = localAddr.IP
return
}