From 409350f530ca370c84e43d2b99df0516c63c56a8 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Sun, 7 Apr 2024 19:14:07 +0800 Subject: [PATCH] =?UTF-8?q?other:=20=E8=B7=A8=E9=98=9F=E5=88=97=E6=B6=88?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/internal/v2/message.go | 34 +++++++++++++++++++++++++++++++ server/internal/v2/server_test.go | 11 +++++----- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/server/internal/v2/message.go b/server/internal/v2/message.go index b466c32..5ea437b 100644 --- a/server/internal/v2/message.go +++ b/server/internal/v2/message.go @@ -213,3 +213,37 @@ func (m *connAsyncCallbackMessage) OnProcess() { m.handler(m.controller.GetServer(), m.conn, m.err) } } + +// GenerateCrossQueueMessage 生成跨队列消息,该消息将会把消息传入对应 ident 所在队列进行处理,并在处理完成时进行回调 +func GenerateCrossQueueMessage(targetIdent string, handler func(srv Server), callback func(srv Server)) Message { + return &crossQueueMessage{ + targetIdent: targetIdent, + handler: handler, + callback: callback, + } +} + +type crossQueueMessage struct { + controller Controller + message queue.MessageWrapper[int, string, Message] + handler func(srv Server) + callback func(srv Server) + targetIdent string +} + +func (m *crossQueueMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { + m.controller = controller + m.message = message +} + +func (m *crossQueueMessage) OnProcess() { + m.controller.PushIdentMessage(m.targetIdent, GenerateSystemSyncMessage(func(srv Server) { + m.handler(srv) + + if m.message.HasIdent() { + m.controller.PushIdentMessage(m.message.Ident(), GenerateSystemSyncMessage(m.callback)) + } else { + m.controller.PushSystemMessage(GenerateSystemSyncMessage(m.callback)) + } + })) +} diff --git a/server/internal/v2/server_test.go b/server/internal/v2/server_test.go index 8eeda7b..bff33f4 100644 --- a/server/internal/v2/server_test.go +++ b/server/internal/v2/server_test.go @@ -30,20 +30,19 @@ func TestNewServer(t *testing.T) { var tm = make(map[string]bool) srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) { - conn.SetActor("12321") + conn.SetActor(random.HostName()) if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil { t.Error(err) } - conn.PushAsyncMessage(func(srv server.Server, conn server.Conn) error { - time.Sleep(time.Second * 5) - return nil - }, func(srv server.Server, conn server.Conn, err error) { + conn.PushMessage(server.GenerateCrossQueueMessage("target", func(srv server.Server) { for i := 0; i < 10000000; i++ { _ = tm["1"] tm["1"] = random.Bool() } - }) + }, func(srv server.Server) { + + })) }) srv.RegisterConnectionReceivePacketEvent(func(srv server.Server, conn server.Conn, packet server.Packet) {