From 90b7e4c1f8dad5f60ac60d250a6b3cd1ab03cabf Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Fri, 12 Jan 2024 15:52:59 +0800 Subject: [PATCH] =?UTF-8?q?test:=20dispatcher=20=E5=8C=85=E5=AE=8C?= =?UTF-8?q?=E5=96=84=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispatcher/dispatcher_example_test.go | 37 +++ server/internal/dispatcher/dispatcher_test.go | 156 ++++++++++- server/internal/dispatcher/manager_test.go | 251 +++++++++++++++--- 3 files changed, 401 insertions(+), 43 deletions(-) create mode 100644 server/internal/dispatcher/dispatcher_example_test.go diff --git a/server/internal/dispatcher/dispatcher_example_test.go b/server/internal/dispatcher/dispatcher_example_test.go new file mode 100644 index 0000000..2a9bef6 --- /dev/null +++ b/server/internal/dispatcher/dispatcher_example_test.go @@ -0,0 +1,37 @@ +package dispatcher_test + +import ( + "fmt" + "github.com/kercylan98/minotaur/server/internal/dispatcher" + "sync" + "sync/atomic" +) + +func ExampleNewDispatcher() { + m := new(atomic.Int64) + fm := new(atomic.Int64) + w := new(sync.WaitGroup) + w.Add(1) + d := dispatcher.NewDispatcher(1024, "example-dispatcher", func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) { + m.Add(1) + }) + d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) { + w.Done() + }) + var producers = []string{"producer1", "producer2", "producer3"} + for i := 0; i < len(producers); i++ { + p := producers[i] + for i := 0; i < 10; i++ { + d.Put(&TestMessage{producer: p}) + } + d.SetProducerDoneHandler(p, func(p string, dispatcher *dispatcher.Action[string, *TestMessage]) { + fm.Add(1) + }) + } + d.Start() + d.Expel() + w.Wait() + fmt.Println(fmt.Sprintf("producer num: %d, producer done: %d, finished: %d", len(producers), fm.Load(), m.Load())) + // Output: + // producer num: 3, producer done: 3, finished: 30 +} diff --git a/server/internal/dispatcher/dispatcher_test.go b/server/internal/dispatcher/dispatcher_test.go index 74ca683..2233998 100644 --- a/server/internal/dispatcher/dispatcher_test.go +++ b/server/internal/dispatcher/dispatcher_test.go @@ -31,6 +31,7 @@ func TestNewDispatcher(t *testing.T) { } for _, c := range cases { + c := c t.Run(c.name, func(t *testing.T) { defer func() { if r := recover(); r != nil && !c.shouldPanic { @@ -54,23 +55,24 @@ func TestDispatcher_SetProducerDoneHandler(t *testing.T) { } for _, c := range cases { + c := c t.Run(c.name, func(t *testing.T) { c.messageFinish = &atomic.Bool{} w := new(sync.WaitGroup) d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) { w.Done() }) - d.SetProducerDoneHandler("producer", func(p string, dispatcher *dispatcher.Dispatcher[string, *TestMessage]) { + d.Put(&TestMessage{producer: c.producer}) + d.SetProducerDoneHandler(c.producer, func(p string, dispatcher *dispatcher.Action[string, *TestMessage]) { c.messageFinish.Store(true) }) if c.cancel { - d.SetProducerDoneHandler("producer", nil) + d.SetProducerDoneHandler(c.producer, nil) } - d.Put(&TestMessage{producer: "producer"}) w.Add(1) d.Start() w.Wait() - if c.messageFinish.Load() && c.cancel { + if c.cancel && c.messageFinish.Load() { t.Errorf("%s should cancel, but not", c.name) } }) @@ -90,6 +92,7 @@ func TestDispatcher_SetClosedHandler(t *testing.T) { } for _, c := range cases { + c := c t.Run(c.name, func(t *testing.T) { c.handlerFinishMsgCount = &atomic.Int64{} w := new(sync.WaitGroup) @@ -97,7 +100,7 @@ func TestDispatcher_SetClosedHandler(t *testing.T) { time.Sleep(c.msgTime) c.handlerFinishMsgCount.Add(1) }) - d.SetClosedHandler(func(dispatcher *dispatcher.Dispatcher[string, *TestMessage]) { + d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) { w.Done() }) for i := 0; i < c.msgCount; i++ { @@ -114,6 +117,41 @@ func TestDispatcher_SetClosedHandler(t *testing.T) { } } +func TestIncrCount(t *testing.T) { + var cases = []struct { + name string + producer string + messageDone *atomic.Bool + }{ + {name: "TestIncrCount_Normal", producer: "producer"}, + } + + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + c.messageDone = &atomic.Bool{} + w := new(sync.WaitGroup) + w.Add(1) + var d *dispatcher.Dispatcher[string, *TestMessage] + d = dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) { + c.messageDone.Store(true) + d.IncrCount(c.producer, -1) + }) + d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) { + w.Done() + }) + d.Start() + d.IncrCount(c.producer, 1) + d.Expel() + d.Put(&TestMessage{producer: c.producer}) + w.Wait() + if !c.messageDone.Load() { + t.Errorf("%s should done, but not", c.name) + } + }) + } +} + func TestDispatcher_Expel(t *testing.T) { var cases = []struct { name string @@ -127,6 +165,7 @@ func TestDispatcher_Expel(t *testing.T) { } for _, c := range cases { + c := c t.Run(c.name, func(t *testing.T) { c.handlerFinishMsgCount = &atomic.Int64{} w := new(sync.WaitGroup) @@ -134,7 +173,7 @@ func TestDispatcher_Expel(t *testing.T) { time.Sleep(c.msgTime) c.handlerFinishMsgCount.Add(1) }) - d.SetClosedHandler(func(dispatcher *dispatcher.Dispatcher[string, *TestMessage]) { + d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) { w.Done() }) for i := 0; i < c.msgCount; i++ { @@ -163,13 +202,14 @@ func TestDispatcher_UnExpel(t *testing.T) { } for _, c := range cases { + c := c t.Run(c.name, func(t *testing.T) { c.closed = &atomic.Bool{} w := new(sync.WaitGroup) d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) { w.Done() }) - d.SetClosedHandler(func(dispatcher *dispatcher.Dispatcher[string, *TestMessage]) { + d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) { c.closed.Store(true) }) d.Put(&TestMessage{producer: "producer"}) @@ -188,3 +228,105 @@ func TestDispatcher_UnExpel(t *testing.T) { }) } } + +func TestDispatcher_Put(t *testing.T) { + var cases = []struct { + name string + producer string + messageDone *atomic.Bool + }{ + {name: "TestDispatcher_Put_Normal", producer: "producer"}, + } + + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + c.messageDone = &atomic.Bool{} + w := new(sync.WaitGroup) + w.Add(1) + d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) { + c.messageDone.Store(true) + w.Done() + }) + d.Start() + d.Put(&TestMessage{producer: c.producer}) + d.Expel() + w.Wait() + if !c.messageDone.Load() { + t.Errorf("%s should done, but not", c.name) + } + }) + } +} + +func TestDispatcher_Start(t *testing.T) { + var cases = []struct { + name string + producer string + messageDone *atomic.Bool + }{ + {name: "TestDispatcher_Start_Normal", producer: "producer"}, + } + + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + c.messageDone = &atomic.Bool{} + w := new(sync.WaitGroup) + w.Add(1) + d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) { + c.messageDone.Store(true) + w.Done() + }) + d.Start() + d.Put(&TestMessage{producer: c.producer}) + d.Expel() + w.Wait() + if !c.messageDone.Load() { + t.Errorf("%s should done, but not", c.name) + } + }) + } +} + +func TestDispatcher_Name(t *testing.T) { + var cases = []struct { + name string + }{ + {name: "TestDispatcher_Name_Normal"}, + } + + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}) + if d.Name() != c.name { + t.Errorf("%s should equal %s, but not", c.name, c.name) + } + }) + } +} + +func TestDispatcher_Closed(t *testing.T) { + var cases = []struct { + name string + }{ + {name: "TestDispatcher_Closed_Normal"}, + } + + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + w := new(sync.WaitGroup) + w.Add(1) + d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}) + d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) { w.Done() }) + d.Start() + d.Expel() + w.Wait() + if !d.Closed() { + t.Errorf("%s should closed, but not", c.name) + } + }) + } +} diff --git a/server/internal/dispatcher/manager_test.go b/server/internal/dispatcher/manager_test.go index ad660b6..77b93ae 100644 --- a/server/internal/dispatcher/manager_test.go +++ b/server/internal/dispatcher/manager_test.go @@ -1,46 +1,225 @@ package dispatcher_test import ( + "fmt" "github.com/kercylan98/minotaur/server/internal/dispatcher" - "github.com/kercylan98/minotaur/utils/times" + "sync/atomic" "testing" - "time" ) -func TestManager(t *testing.T) { - var mgr *dispatcher.Manager[string, *TestMessage] - var onHandler = func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) { - t.Log(dispatcher.Name(), message, mgr.GetDispatcherNum()) - switch message.v { - case 4: - mgr.UnBindProducer("test") - t.Log("UnBindProducer") - case 6: - mgr.BindProducer(message.GetProducer(), "test-dispatcher") - t.Log("BindProducer") - case 9: - dispatcher.Put(&TestMessage{ - producer: "test", - v: 10, - }) - case 10: - mgr.UnBindProducer("test") - t.Log("UnBindProducer", mgr.GetDispatcherNum()) - } - - } - mgr = dispatcher.NewManager[string, *TestMessage](1024*16, onHandler) - - mgr.BindProducer("test", "test-dispatcher") - for i := 0; i < 10; i++ { - d := mgr.GetDispatcher("test").SetClosedHandler(func(dispatcher *dispatcher.Dispatcher[string, *TestMessage]) { - t.Log("closed") - }) - d.Put(&TestMessage{ - producer: "test", - v: i, - }) +func TestNewManager(t *testing.T) { + var cases = []struct { + name string + bufferSize int + handler dispatcher.Handler[string, *TestMessage] + shouldPanic bool + }{ + {name: "TestNewManager_BufferSize0AndHandlerNil", bufferSize: 0, handler: nil, shouldPanic: true}, + {name: "TestNewManager_BufferSize0AndHandlerNotNil", bufferSize: 0, handler: func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}, shouldPanic: true}, + {name: "TestNewManager_BufferSize1AndHandlerNil", bufferSize: 1, handler: nil, shouldPanic: true}, + {name: "TestNewManager_BufferSize1AndHandlerNotNil", bufferSize: 1, handler: func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}, shouldPanic: false}, } - time.Sleep(times.Day) + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + defer func() { + if r := recover(); r != nil && !c.shouldPanic { + t.Errorf("NewManager() should not panic, but panic: %v", r) + } + }() + dispatcher.NewManager[string, *TestMessage](c.bufferSize, c.handler) + }) + } +} + +func TestManager_SetDispatcherClosedHandler(t *testing.T) { + var cases = []struct { + name string + setCloseHandler bool + }{ + {name: "TestManager_SetDispatcherClosedHandler_Set", setCloseHandler: true}, + {name: "TestManager_SetDispatcherClosedHandler_NotSet", setCloseHandler: false}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + var closed atomic.Bool + m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}) + if c.setCloseHandler { + m.SetDispatcherClosedHandler(func(name string) { + closed.Store(true) + }) + } + m.BindProducer(c.name, c.name) + m.UnBindProducer(c.name) + m.Wait() + if c.setCloseHandler && !closed.Load() { + t.Errorf("SetDispatcherClosedHandler() should be called") + } + + }) + } +} + +func TestManager_SetDispatcherCreatedHandler(t *testing.T) { + var cases = []struct { + name string + setCreatedHandler bool + }{ + {name: "TestManager_SetDispatcherCreatedHandler_Set", setCreatedHandler: true}, + {name: "TestManager_SetDispatcherCreatedHandler_NotSet", setCreatedHandler: false}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + var created atomic.Bool + m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}) + if c.setCreatedHandler { + m.SetDispatcherCreatedHandler(func(name string) { + created.Store(true) + }) + } + m.BindProducer(c.name, c.name) + m.UnBindProducer(c.name) + m.Wait() + if c.setCreatedHandler && !created.Load() { + t.Errorf("SetDispatcherCreatedHandler() should be called") + } + + }) + } +} + +func TestManager_HasDispatcher(t *testing.T) { + var cases = []struct { + name string + bindName string + has bool + }{ + {name: "TestManager_HasDispatcher_Has", bindName: "TestManager_HasDispatcher_Has", has: true}, + {name: "TestManager_HasDispatcher_NotHas", bindName: "TestManager_HasDispatcher_NotHas", has: false}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}) + m.BindProducer(c.bindName, c.bindName) + var cond string + if c.has { + cond = c.bindName + } + if m.HasDispatcher(cond) != c.has { + t.Errorf("HasDispatcher() should return %v", c.has) + } + }) + } +} + +func TestManager_GetDispatcherNum(t *testing.T) { + var cases = []struct { + name string + num int + }{ + {name: "TestManager_GetDispatcherNum_N1", num: -1}, + {name: "TestManager_GetDispatcherNum_0", num: 0}, + {name: "TestManager_GetDispatcherNum_1", num: 1}, + {name: "TestManager_GetDispatcherNum_2", num: 2}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}) + switch { + case c.num <= 0: + return + case c.num == 1: + if m.GetDispatcherNum() != 1 { + t.Errorf("GetDispatcherNum() should return 1") + } + return + default: + for i := 0; i < c.num-1; i++ { + m.BindProducer(fmt.Sprintf("%s_%d", c.name, i), fmt.Sprintf("%s_%d", c.name, i)) + } + if m.GetDispatcherNum() != c.num { + t.Errorf("GetDispatcherNum() should return %v", c.num) + } + } + }) + } +} + +func TestManager_GetSystemDispatcher(t *testing.T) { + var cases = []struct { + name string + }{ + {name: "TestManager_GetSystemDispatcher"}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}) + if m.GetSystemDispatcher() == nil { + t.Errorf("GetSystemDispatcher() should not return nil") + } + }) + } +} + +func TestManager_GetDispatcher(t *testing.T) { + var cases = []struct { + name string + bindName string + }{ + {name: "TestManager_GetDispatcher", bindName: "TestManager_GetDispatcher"}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}) + m.BindProducer(c.bindName, c.bindName) + if m.GetDispatcher(c.bindName) == nil { + t.Errorf("GetDispatcher() should not return nil") + } + }) + } +} + +func TestManager_BindProducer(t *testing.T) { + var cases = []struct { + name string + bindName string + }{ + {name: "TestManager_BindProducer", bindName: "TestManager_BindProducer"}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}) + m.BindProducer(c.bindName, c.bindName) + if m.GetDispatcher(c.bindName) == nil { + t.Errorf("GetDispatcher() should not return nil") + } + }) + } +} + +func TestManager_UnBindProducer(t *testing.T) { + var cases = []struct { + name string + bindName string + }{ + {name: "TestManager_UnBindProducer", bindName: "TestManager_UnBindProducer"}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}) + m.BindProducer(c.bindName, c.bindName) + m.UnBindProducer(c.bindName) + if m.GetDispatcher(c.bindName) != m.GetSystemDispatcher() { + t.Errorf("GetDispatcher() should return SystemDispatcher") + } + }) + } }