other: 跨队列消息

This commit is contained in:
kercylan98 2024-04-07 19:14:07 +08:00
parent 35e13d9cd5
commit 409350f530
2 changed files with 39 additions and 6 deletions

View File

@ -213,3 +213,37 @@ func (m *connAsyncCallbackMessage) OnProcess() {
m.handler(m.controller.GetServer(), m.conn, m.err)
}
}
// GenerateCrossQueueMessage 生成跨队列消息,该消息将会把消息传入对应 ident 所在队列进行处理,并在处理完成时进行回调
func GenerateCrossQueueMessage(targetIdent string, handler func(srv Server), callback func(srv Server)) Message {
return &crossQueueMessage{
targetIdent: targetIdent,
handler: handler,
callback: callback,
}
}
type crossQueueMessage struct {
controller Controller
message queue.MessageWrapper[int, string, Message]
handler func(srv Server)
callback func(srv Server)
targetIdent string
}
func (m *crossQueueMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) {
m.controller = controller
m.message = message
}
func (m *crossQueueMessage) OnProcess() {
m.controller.PushIdentMessage(m.targetIdent, GenerateSystemSyncMessage(func(srv Server) {
m.handler(srv)
if m.message.HasIdent() {
m.controller.PushIdentMessage(m.message.Ident(), GenerateSystemSyncMessage(m.callback))
} else {
m.controller.PushSystemMessage(GenerateSystemSyncMessage(m.callback))
}
}))
}

View File

@ -30,20 +30,19 @@ func TestNewServer(t *testing.T) {
var tm = make(map[string]bool)
srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) {
conn.SetActor("12321")
conn.SetActor(random.HostName())
if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil {
t.Error(err)
}
conn.PushAsyncMessage(func(srv server.Server, conn server.Conn) error {
time.Sleep(time.Second * 5)
return nil
}, func(srv server.Server, conn server.Conn, err error) {
conn.PushMessage(server.GenerateCrossQueueMessage("target", func(srv server.Server) {
for i := 0; i < 10000000; i++ {
_ = tm["1"]
tm["1"] = random.Bool()
}
})
}, func(srv server.Server) {
}))
})
srv.RegisterConnectionReceivePacketEvent(func(srv server.Server, conn server.Conn, packet server.Packet) {