diff --git a/server/internal/v2/conn.go b/server/internal/v2/conn.go index ce2e92f..d03382d 100644 --- a/server/internal/v2/conn.go +++ b/server/internal/v2/conn.go @@ -21,6 +21,9 @@ type Conn interface { // WriteBytes 写入数据 WriteBytes(data []byte) error + + // WriteContext 写入数据 + WriteContext(data []byte, context interface{}) error } func newConn(c net.Conn, connWriter ConnWriter) *conn { @@ -31,9 +34,9 @@ func newConn(c net.Conn, connWriter ConnWriter) *conn { } type conn struct { - conn net.Conn - writer ConnWriter - actor string + conn net.Conn // 连接 + writer ConnWriter // 写入器 + actor string // Actor 名称 } func (c *conn) SetActor(actor string) { @@ -56,3 +59,7 @@ func (c *conn) WriteBytes(data []byte) error { _, err := c.conn.Write(data) return err } + +func (c *conn) WriteContext(data []byte, context interface{}) error { + return c.writer(NewPacket(data).SetContext(context)) +} diff --git a/server/internal/v2/controller.go b/server/internal/v2/controller.go index 4f3f54b..d4600e0 100644 --- a/server/internal/v2/controller.go +++ b/server/internal/v2/controller.go @@ -20,33 +20,35 @@ func (s *controller) init(srv *server) *controller { } func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) { - if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) { - srv.connections[conn] = newConn(conn, writer) + if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + c := newConn(conn, writer) + srv.connections[conn] = c + s.events.onConnectionOpened(c) })); err != nil { panic(err) } } func (s *controller) EliminateConnection(conn net.Conn, err error) { - if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) { + if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + c, exist := srv.connections[conn] + if !exist { + return + } delete(srv.connections, conn) + srv.events.onConnectionClosed(c, err) })); err != nil { panic(err) } } func (s *controller) ReactPacket(conn net.Conn, packet Packet) { - if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) { + if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { c, exist := srv.connections[conn] if !exist { return } - - if err := srv.reactor.Dispatch(c.GetActor(), HandlerMessage(srv, func(srv *server) { - srv.events.onConnectionReceivePacket(c, packet) - })); err != nil { - panic(err) - } + srv.events.onConnectionReceivePacket(c, packet) })); err != nil { panic(err) } diff --git a/server/internal/v2/events.go b/server/internal/v2/events.go index c806dd6..f422656 100644 --- a/server/internal/v2/events.go +++ b/server/internal/v2/events.go @@ -1,19 +1,39 @@ package server -import "github.com/kercylan98/minotaur/utils/collection/listings" +import ( + "github.com/kercylan98/minotaur/utils/collection/listings" + "time" +) type ( + LaunchedEventHandler func(srv Server, ip string, t time.Time) + ConnectionOpenedEventHandler func(srv Server, conn Conn) + ConnectionClosedEventHandler func(srv Server, conn Conn, err error) ConnectionReceivePacketEventHandler func(srv Server, conn Conn, packet Packet) ) type Events interface { + // RegisterLaunchedEvent 注册服务器启动事件,当服务器启动后将会触发该事件 + // - 该事件将在系统级 Actor 中运行,该事件中阻塞会导致服务器启动延迟 + RegisterLaunchedEvent(handler LaunchedEventHandler, priority ...int) + // RegisterConnectionOpenedEvent 注册连接打开事件,当新连接创建完毕时将会触发该事件 + // - 该事件将在系统级 Actor 中运行,不应执行阻塞操作 + RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int) + // RegisterConnectionClosedEvent 注册连接关闭事件,当连接关闭后将会触发该事件 + // - 该事件将在系统级 Actor 中运行,不应执行阻塞操作 + RegisterConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int) + // RegisterConnectionReceivePacketEvent 注册连接接收数据包事件,当连接接收到数据包后将会触发该事件 + // - 该事件将在连接的 Actor 中运行,不应执行阻塞操作 RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) } type events struct { *server - connectionReceivePacketEventHandlers listings.PrioritySlice[ConnectionReceivePacketEventHandler] + launchedEventHandlers listings.SyncPrioritySlice[LaunchedEventHandler] + connectionOpenedEventHandlers listings.SyncPrioritySlice[ConnectionOpenedEventHandler] + connectionClosedEventHandlers listings.SyncPrioritySlice[ConnectionClosedEventHandler] + connectionReceivePacketEventHandlers listings.SyncPrioritySlice[ConnectionReceivePacketEventHandler] } func (s *events) init(srv *server) *events { @@ -21,13 +41,54 @@ func (s *events) init(srv *server) *events { return s } +func (s *events) RegisterLaunchedEvent(handler LaunchedEventHandler, priority ...int) { + s.launchedEventHandlers.AppendByOptionalPriority(handler, priority...) +} + +func (s *events) onLaunched() { + _ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + s.launchedEventHandlers.RangeValue(func(index int, value LaunchedEventHandler) bool { + value(s.server, s.server.state.Ip, s.server.state.LaunchedAt) + return true + }) + })) +} + +func (s *events) RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int) { + s.connectionOpenedEventHandlers.AppendByOptionalPriority(handler, priority...) +} + +func (s *events) onConnectionOpened(conn Conn) { + _ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + s.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool { + value(s.server, conn) + return true + }) + })) +} + +func (s *events) RegisterConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int) { + s.connectionClosedEventHandlers.AppendByOptionalPriority(handler, priority...) +} + +func (s *events) onConnectionClosed(conn Conn, err error) { + _ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + s.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool { + value(s.server, conn, err) + return true + }) + })) +} + func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) { s.connectionReceivePacketEventHandlers.AppendByOptionalPriority(handler, priority...) } func (s *events) onConnectionReceivePacket(conn Conn, packet Packet) { - s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool { - value(s.server, conn, packet) - return true - }) + _ = s.server.reactor.AutoDispatch(conn.GetActor(), NativeMessage(s.server, func(srv *server) { + s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool { + value(s.server, conn, packet) + return true + }) + })) } diff --git a/server/internal/v2/message.go b/server/internal/v2/message.go index 60fbfb2..77c737b 100644 --- a/server/internal/v2/message.go +++ b/server/internal/v2/message.go @@ -4,15 +4,15 @@ type Message interface { Execute() } -func HandlerMessage(srv *server, handler func(srv *server)) Message { - return &handlerMessage{srv: srv, handler: handler} +func NativeMessage(srv *server, handler func(srv *server)) Message { + return &nativeMessage{srv: srv, handler: handler} } -type handlerMessage struct { +type nativeMessage struct { srv *server handler func(srv *server) } -func (s *handlerMessage) Execute() { +func (s *nativeMessage) Execute() { s.handler(s.srv) } diff --git a/server/internal/v2/network/websocket_handler.go b/server/internal/v2/network/websocket_handler.go index 1f0de8d..a477c39 100644 --- a/server/internal/v2/network/websocket_handler.go +++ b/server/internal/v2/network/websocket_handler.go @@ -5,7 +5,6 @@ import ( "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" "github.com/kercylan98/minotaur/server/internal/v2" - "github.com/kercylan98/minotaur/utils/log" "github.com/panjf2000/gnet/v2" "time" ) @@ -35,9 +34,6 @@ func (w *websocketHandler) OnShutdown(eng gnet.Engine) { func (w *websocketHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { wrapper := newWebsocketWrapper(c) c.SetContext(wrapper) - w.controller.RegisterConnection(c, func(packet server.Packet) error { - return wsutil.WriteServerMessage(c, packet.GetContext().(ws.OpCode), packet.GetBytes()) - }) return } @@ -49,15 +45,16 @@ func (w *websocketHandler) OnClose(c gnet.Conn, err error) (action gnet.Action) func (w *websocketHandler) OnTraffic(c gnet.Conn) (action gnet.Action) { wrapper := c.Context().(*websocketWrapper) - // read to buffer if err := wrapper.readToBuffer(); err != nil { - log.Error("websocket", log.Err(err)) return gnet.Close } - // check or upgrade - if err := wrapper.upgrade(w.upgrader); err != nil { - log.Error("websocket", log.Err(err)) + if err := wrapper.upgrade(w.upgrader, func() { + // 协议升级成功后视为连接建立 + w.controller.RegisterConnection(c, func(packet server.Packet) error { + return wsutil.WriteServerMessage(c, packet.GetContext().(ws.OpCode), packet.GetBytes()) + }) + }); err != nil { return gnet.Close } wrapper.active = time.Now() @@ -65,7 +62,7 @@ func (w *websocketHandler) OnTraffic(c gnet.Conn) (action gnet.Action) { // decode messages, err := wrapper.decode() if err != nil { - log.Error("websocket", log.Err(err)) + return gnet.Close } for _, message := range messages { @@ -73,7 +70,6 @@ func (w *websocketHandler) OnTraffic(c gnet.Conn) (action gnet.Action) { packet.SetContext(message.OpCode) w.controller.ReactPacket(c, packet) } - return } diff --git a/server/internal/v2/network/wsbsocket_wrapper.go b/server/internal/v2/network/wsbsocket_wrapper.go index 3bc155e..7bc466e 100644 --- a/server/internal/v2/network/wsbsocket_wrapper.go +++ b/server/internal/v2/network/wsbsocket_wrapper.go @@ -50,7 +50,7 @@ func (w *websocketWrapper) readToBuffer() error { } // upgrade 升级 -func (w *websocketWrapper) upgrade(upgrader ws.Upgrader) (err error) { +func (w *websocketWrapper) upgrade(upgrader ws.Upgrader, upgradedHandler func()) (err error) { if w.upgraded { return } @@ -73,6 +73,7 @@ func (w *websocketWrapper) upgrade(upgrader ws.Upgrader) (err error) { } buf.Next(skip) w.upgraded = true + upgradedHandler() return } diff --git a/server/internal/v2/reactor/event.go b/server/internal/v2/reactor/event.go deleted file mode 100644 index 7145683..0000000 --- a/server/internal/v2/reactor/event.go +++ /dev/null @@ -1,4 +0,0 @@ -package reactor - -type Event struct { -} diff --git a/server/internal/v2/reactor/reactor.go b/server/internal/v2/reactor/reactor.go index 92e893c..c5656d2 100644 --- a/server/internal/v2/reactor/reactor.go +++ b/server/internal/v2/reactor/reactor.go @@ -106,6 +106,14 @@ func (r *Reactor[M]) SetDebug(debug bool) *Reactor[M] { return r } +// AutoDispatch 自动分发,当 ident 为空字符串时,分发到系统级别的队列,否则分发到 ident 使用的队列 +func (r *Reactor[M]) AutoDispatch(ident string, msg M) error { + if ident == "" { + return r.SystemDispatch(msg) + } + return r.Dispatch(ident, msg) +} + // SystemDispatch 将消息分发到系统级别的队列 func (r *Reactor[M]) SystemDispatch(msg M) error { if atomic.LoadInt32(&r.state) > StatusRunning { diff --git a/server/internal/v2/server.go b/server/internal/v2/server.go index 005b842..c507f9d 100644 --- a/server/internal/v2/server.go +++ b/server/internal/v2/server.go @@ -2,19 +2,31 @@ package server import ( "github.com/kercylan98/minotaur/server/internal/v2/reactor" + "github.com/kercylan98/minotaur/utils/network" "golang.org/x/net/context" + "os" + "os/signal" + "syscall" + "time" ) type Server interface { Events + // Run 运行服务器 Run() error + + // Shutdown 关闭服务器 Shutdown() error + + // GetStatus 获取服务器状态 + GetStatus() *State } type server struct { *controller *events + state *State ctx context.Context cancel context.CancelFunc network Network @@ -31,6 +43,7 @@ func NewServer(network Network) Server { srv.reactor = reactor.NewReactor[Message](1024*8, 1024, func(msg Message) { msg.Execute() }, nil) + srv.state = new(State).init(srv) return srv } @@ -40,7 +53,18 @@ func (s *server) Run() (err error) { return } - if err = s.network.OnRun(); err != nil { + ip, _ := network.IP() + s.state.onLaunched(ip.String(), time.Now()) + go func(s *server) { + if err = s.network.OnRun(); err != nil { + panic(err) + } + }(s) + + var systemSignal = make(chan os.Signal) + signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) + <-systemSignal + if err := s.Shutdown(); err != nil { panic(err) } return @@ -52,3 +76,7 @@ func (s *server) Shutdown() (err error) { s.reactor.Close() return } + +func (s *server) GetStatus() *State { + return s.state.Status() +} diff --git a/server/internal/v2/server_test.go b/server/internal/v2/server_test.go index 9e339e8..c4f940c 100644 --- a/server/internal/v2/server_test.go +++ b/server/internal/v2/server_test.go @@ -1,18 +1,32 @@ package server_test import ( + "github.com/gobwas/ws" "github.com/kercylan98/minotaur/server/internal/v2" "github.com/kercylan98/minotaur/server/internal/v2/network" "testing" + "time" ) func TestNewServer(t *testing.T) { srv := server.NewServer(network.WebSocket(":9999")) + + srv.RegisterLaunchedEvent(func(srv server.Server, ip string, launchedAt time.Time) { + t.Log("launched", ip, launchedAt) + }) + + srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) { + if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil { + t.Error(err) + } + }) + srv.RegisterConnectionReceivePacketEvent(func(srv server.Server, conn server.Conn, packet server.Packet) { if err := conn.WritePacket(packet); err != nil { panic(err) } }) + if err := srv.Run(); err != nil { panic(err) } diff --git a/server/internal/v2/state.go b/server/internal/v2/state.go new file mode 100644 index 0000000..389741a --- /dev/null +++ b/server/internal/v2/state.go @@ -0,0 +1,27 @@ +package server + +import "time" + +type State struct { + server *server + LaunchedAt time.Time // 服务器启动时间 + Ip string // 服务器 IP 地址 +} + +func (s *State) init(srv *server) *State { + s.server = srv + return s +} + +func (s *State) Status() *State { + return &State{ + LaunchedAt: s.LaunchedAt, + Ip: s.Ip, + } +} + +func (s *State) onLaunched(ip string, t time.Time) { + s.LaunchedAt = t + s.Ip = ip + s.server.events.onLaunched() +} diff --git a/utils/collection/listings/sync_priority_slice.go b/utils/collection/listings/sync_priority_slice.go new file mode 100644 index 0000000..b1b5083 --- /dev/null +++ b/utils/collection/listings/sync_priority_slice.go @@ -0,0 +1,209 @@ +package listings + +import ( + "fmt" + "sort" + "sync" +) + +// NewSyncPrioritySlice 创建一个并发安全的优先级切片,优先级越低越靠前 +func NewSyncPrioritySlice[V any](lengthAndCap ...int) *SyncPrioritySlice[V] { + p := &SyncPrioritySlice[V]{} + if len(lengthAndCap) > 0 { + var length = lengthAndCap[0] + var c int + if len(lengthAndCap) > 1 { + c = lengthAndCap[1] + } + p.items = make([]*priorityItem[V], length, c) + } + return p +} + +// SyncPrioritySlice 是一个优先级切片,优先级越低越靠前 +type SyncPrioritySlice[V any] struct { + rw sync.RWMutex + items []*priorityItem[V] +} + +// Len 返回切片长度 +func (slf *SyncPrioritySlice[V]) Len() int { + slf.rw.RLock() + defer slf.rw.RUnlock() + return len(slf.items) +} + +// Cap 返回切片容量 +func (slf *SyncPrioritySlice[V]) Cap() int { + slf.rw.RLock() + defer slf.rw.RUnlock() + return cap(slf.items) +} + +// Clear 清空切片 +func (slf *SyncPrioritySlice[V]) Clear() { + slf.rw.Lock() + defer slf.rw.Unlock() + slf.items = slf.items[:0] +} + +// Append 添加元素 +func (slf *SyncPrioritySlice[V]) Append(v V, p int) { + slf.rw.Lock() + defer slf.rw.Unlock() + slf.items = append(slf.items, &priorityItem[V]{ + v: v, + p: p, + }) + slf.sort() +} + +// Appends 添加元素 +func (slf *SyncPrioritySlice[V]) Appends(priority int, vs ...V) { + for _, v := range vs { + slf.Append(v, priority) + } + slf.sort() +} + +// AppendByOptionalPriority 添加元素 +func (slf *SyncPrioritySlice[V]) AppendByOptionalPriority(v V, priority ...int) { + if len(priority) == 0 { + slf.Append(v, 0) + } else { + slf.Append(v, priority[0]) + } +} + +// Get 获取元素 +func (slf *SyncPrioritySlice[V]) Get(index int) (V, int) { + slf.rw.RLock() + defer slf.rw.RUnlock() + i := slf.items[index] + return i.Value(), i.Priority() +} + +// GetValue 获取元素值 +func (slf *SyncPrioritySlice[V]) GetValue(index int) V { + slf.rw.RLock() + defer slf.rw.RUnlock() + return slf.items[index].Value() +} + +// GetPriority 获取元素优先级 +func (slf *SyncPrioritySlice[V]) GetPriority(index int) int { + slf.rw.RLock() + defer slf.rw.RUnlock() + return slf.items[index].Priority() +} + +// Set 设置元素 +func (slf *SyncPrioritySlice[V]) Set(index int, value V, priority int) { + slf.rw.Lock() + defer slf.rw.Unlock() + before := slf.items[index] + slf.items[index] = &priorityItem[V]{ + v: value, + p: priority, + } + if before.Priority() != priority { + slf.sort() + } +} + +// SetValue 设置元素值 +func (slf *SyncPrioritySlice[V]) SetValue(index int, value V) { + slf.rw.Lock() + defer slf.rw.Unlock() + slf.items[index].v = value +} + +// SetPriority 设置元素优先级 +func (slf *SyncPrioritySlice[V]) SetPriority(index int, priority int) { + slf.rw.Lock() + defer slf.rw.Unlock() + slf.items[index].p = priority + slf.sort() +} + +// Action 直接操作切片,如果返回值不为 nil,则替换切片 +func (slf *SyncPrioritySlice[V]) Action(action func(items []*priorityItem[V]) []*priorityItem[V]) { + slf.rw.Lock() + defer slf.rw.Unlock() + if len(slf.items) == 0 { + return + } + if replace := action(slf.items); replace != nil { + slf.items = replace + slf.sort() + } +} + +// Range 遍历切片,如果返回值为 false,则停止遍历 +func (slf *SyncPrioritySlice[V]) Range(action func(index int, item *priorityItem[V]) bool) { + slf.rw.RLock() + defer slf.rw.RUnlock() + for i, item := range slf.items { + if !action(i, item) { + break + } + } +} + +// RangeValue 遍历切片值,如果返回值为 false,则停止遍历 +func (slf *SyncPrioritySlice[V]) RangeValue(action func(index int, value V) bool) { + slf.Range(func(index int, item *priorityItem[V]) bool { + return action(index, item.Value()) + }) +} + +// RangePriority 遍历切片优先级,如果返回值为 false,则停止遍历 +func (slf *SyncPrioritySlice[V]) RangePriority(action func(index int, priority int) bool) { + slf.Range(func(index int, item *priorityItem[V]) bool { + return action(index, item.Priority()) + }) +} + +// Slice 返回切片 +func (slf *SyncPrioritySlice[V]) Slice() []V { + slf.rw.RLock() + defer slf.rw.RUnlock() + var vs []V + for _, item := range slf.items { + vs = append(vs, item.Value()) + } + return vs +} + +// String 返回切片字符串 +func (slf *SyncPrioritySlice[V]) String() string { + slf.rw.RLock() + defer slf.rw.RUnlock() + var vs []V + for _, item := range slf.items { + vs = append(vs, item.Value()) + } + return fmt.Sprint(vs) +} + +// sort 排序 +func (slf *SyncPrioritySlice[V]) sort() { + if len(slf.items) <= 1 { + return + } + sort.Slice(slf.items, func(i, j int) bool { + return slf.items[i].Priority() < slf.items[j].Priority() + }) + for i := 0; i < len(slf.items); i++ { + if i == 0 { + slf.items[i].prev = nil + slf.items[i].next = slf.items[i+1] + } else if i == len(slf.items)-1 { + slf.items[i].prev = slf.items[i-1] + slf.items[i].next = nil + } else { + slf.items[i].prev = slf.items[i-1] + slf.items[i].next = slf.items[i+1] + } + } +} diff --git a/utils/super/ip.go b/utils/super/ip.go new file mode 100644 index 0000000..3921279 --- /dev/null +++ b/utils/super/ip.go @@ -0,0 +1,18 @@ +package super + +import ( + "net" +) + +// IP 返回本机出站地址 +func IP() (ip net.IP, err error) { + var conn net.Conn + conn, err = net.Dial("udp", "8.8.8.8:80") + if err != nil { + return + } + _ = conn.Close() + localAddr := conn.LocalAddr().(*net.UDPAddr) + ip = localAddr.IP + return +}