diff --git a/server/internal/dispatcher/dispatcher.go b/server/internal/dispatcher/dispatcher.go index c859c13..985e28b 100644 --- a/server/internal/dispatcher/dispatcher.go +++ b/server/internal/dispatcher/dispatcher.go @@ -116,7 +116,7 @@ func (d *Dispatcher[P, M]) Expel() { func (d *Dispatcher[P, M]) noLockExpel() { d.expel = true if d.mc <= 0 { - d.abort <- struct{}{} + close(d.abort) } } @@ -139,7 +139,7 @@ func (d *Dispatcher[P, M]) IncrCount(producer P, i int64) { d.mc += i d.pmc[producer] += i if d.expel && d.mc <= 0 { - d.abort <- struct{}{} + close(d.abort) } } @@ -190,7 +190,6 @@ func (d *Dispatcher[P, M]) Start() *Dispatcher[P, M] { if ch := d.closedHandler.Load(); ch != nil { (*ch)(&Action[P, M]{d: d, unlock: true}) } - close(d.abort) }(d) return d } diff --git a/server/server.go b/server/server.go index e03d445..7d7871d 100644 --- a/server/server.go +++ b/server/server.go @@ -191,11 +191,20 @@ func (srv *Server) shutdown(err error) { log.Error("Server", log.String("state", "shutdown"), log.Err(err)) } + var infoCount int for srv.messageCounter.Load() > 0 { - log.Info("Server", log.Any("network", srv.network), log.String("listen", srv.addr), - log.String("action", "shutdown"), log.String("state", "waiting"), log.Int64("message", srv.messageCounter.Load())) + if infoCount%10 == 0 { + log.Info("Server", + log.Any("network", srv.network), + log.String("listen", srv.addr), + log.String("action", "shutdown"), + log.String("state", "waiting"), + log.Int64("message", srv.messageCounter.Load())) + } time.Sleep(time.Second) + infoCount++ } + srv.dispatcherMgr.Wait() if srv.multiple == nil { srv.OnStopEvent() }