other: server 异步消息回调将不再使用 MessageTypeSystem,更改为 MessageTypeAsyncCallback

This commit is contained in:
kercylan98 2023-08-21 18:48:52 +08:00
parent 7cbe5c4805
commit 811e1bd29e
2 changed files with 23 additions and 10 deletions

View File

@ -22,17 +22,21 @@ const (
// MessageTypeAsync 异步消息类型 // MessageTypeAsync 异步消息类型
MessageTypeAsync MessageTypeAsync
// MessageTypeAsyncCallback 异步回调消息类型
MessageTypeAsyncCallback
// MessageTypeSystem 系统消息类型 // MessageTypeSystem 系统消息类型
MessageTypeSystem MessageTypeSystem
) )
var messageNames = map[MessageType]string{ var messageNames = map[MessageType]string{
MessageTypePacket: "MessageTypePacket", MessageTypePacket: "MessageTypePacket",
MessageTypeError: "MessageTypeError", MessageTypeError: "MessageTypeError",
MessageTypeCross: "MessageTypeCross", MessageTypeCross: "MessageTypeCross",
MessageTypeTicker: "MessageTypeTicker", MessageTypeTicker: "MessageTypeTicker",
MessageTypeAsync: "MessageTypeAsync", MessageTypeAsync: "MessageTypeAsync",
MessageTypeSystem: "MessageTypeSystem", MessageTypeAsyncCallback: "MessageTypeAsyncCallback",
MessageTypeSystem: "MessageTypeSystem",
} }
const ( const (
@ -146,6 +150,8 @@ func PushTickerMessage(srv *Server, caller func(), mark ...any) {
// - 异步消息将在服务器的异步消息队列中进行处理,处理完成 caller 的阻塞操作后,将会通过系统消息执行 callback 函数 // - 异步消息将在服务器的异步消息队列中进行处理,处理完成 caller 的阻塞操作后,将会通过系统消息执行 callback 函数
// - callback 函数将在异步消息处理完成后进行调用,无论过程是否产生 err都将被执行允许为 nil // - callback 函数将在异步消息处理完成后进行调用,无论过程是否产生 err都将被执行允许为 nil
// - 需要注意的是为了避免并发问题caller 函数请仅处理阻塞操作,其他操作应该在 callback 函数中进行 // - 需要注意的是为了避免并发问题caller 函数请仅处理阻塞操作,其他操作应该在 callback 函数中进行
//
// 在通过 WithShunt 使用分流服务器时,异步消息不会转换到分流通道中进行处理。依旧需要注意上方第三条
func PushAsyncMessage(srv *Server, caller func() error, callback func(err error), mark ...any) { func PushAsyncMessage(srv *Server, caller func() error, callback func(err error), mark ...any) {
msg := srv.messagePool.Get() msg := srv.messagePool.Get()
msg.t = MessageTypeAsync msg.t = MessageTypeAsync

View File

@ -545,7 +545,7 @@ func (slf *Server) pushMessage(message *Message) {
if slf.isShutdown.Load() { if slf.isShutdown.Load() {
return return
} }
if slf.shuntChannels != nil && (message.t == MessageTypePacket) { if slf.shuntChannels != nil && message.t == MessageTypePacket {
conn := message.attrs[0].(*Conn) conn := message.attrs[0].(*Conn)
channelGuid, allowToCreate := slf.shuntMatcher(conn) channelGuid, allowToCreate := slf.shuntMatcher(conn)
channel, exist := slf.shuntChannels.GetExist(channelGuid) channel, exist := slf.shuntChannels.GetExist(channelGuid)
@ -681,15 +681,22 @@ func (slf *Server) dispatchMessage(msg *Message) {
}() }()
err := handle() err := handle()
if cb && callback != nil { if cb && callback != nil {
PushSystemMessage(slf, func() { acm := slf.messagePool.Get()
callback(err) acm.t = MessageTypeAsyncCallback
}, "AsyncCallback") if len(attrs) > 2 {
acm.attrs = append([]any{func() { callback(err) }}, attrs[2:]...)
} else {
acm.attrs = []any{func() { callback(err) }}
}
slf.pushMessage(acm)
} else if err != nil { } else if err != nil {
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", string(debug.Stack()))) log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", string(debug.Stack())))
} }
}); err != nil { }); err != nil {
panic(err) panic(err)
} }
case MessageTypeAsyncCallback:
attrs[0].(func())()
case MessageTypeSystem: case MessageTypeSystem:
attrs[0].(func())() attrs[0].(func())()
default: default: