test: dispatcher 包完善测试用例
This commit is contained in:
parent
7528dc4a1b
commit
90b7e4c1f8
|
@ -0,0 +1,37 @@
|
|||
package dispatcher_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/server/internal/dispatcher"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
func ExampleNewDispatcher() {
|
||||
m := new(atomic.Int64)
|
||||
fm := new(atomic.Int64)
|
||||
w := new(sync.WaitGroup)
|
||||
w.Add(1)
|
||||
d := dispatcher.NewDispatcher(1024, "example-dispatcher", func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
|
||||
m.Add(1)
|
||||
})
|
||||
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
|
||||
w.Done()
|
||||
})
|
||||
var producers = []string{"producer1", "producer2", "producer3"}
|
||||
for i := 0; i < len(producers); i++ {
|
||||
p := producers[i]
|
||||
for i := 0; i < 10; i++ {
|
||||
d.Put(&TestMessage{producer: p})
|
||||
}
|
||||
d.SetProducerDoneHandler(p, func(p string, dispatcher *dispatcher.Action[string, *TestMessage]) {
|
||||
fm.Add(1)
|
||||
})
|
||||
}
|
||||
d.Start()
|
||||
d.Expel()
|
||||
w.Wait()
|
||||
fmt.Println(fmt.Sprintf("producer num: %d, producer done: %d, finished: %d", len(producers), fm.Load(), m.Load()))
|
||||
// Output:
|
||||
// producer num: 3, producer done: 3, finished: 30
|
||||
}
|
|
@ -31,6 +31,7 @@ func TestNewDispatcher(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil && !c.shouldPanic {
|
||||
|
@ -54,23 +55,24 @@ func TestDispatcher_SetProducerDoneHandler(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
c.messageFinish = &atomic.Bool{}
|
||||
w := new(sync.WaitGroup)
|
||||
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
|
||||
w.Done()
|
||||
})
|
||||
d.SetProducerDoneHandler("producer", func(p string, dispatcher *dispatcher.Dispatcher[string, *TestMessage]) {
|
||||
d.Put(&TestMessage{producer: c.producer})
|
||||
d.SetProducerDoneHandler(c.producer, func(p string, dispatcher *dispatcher.Action[string, *TestMessage]) {
|
||||
c.messageFinish.Store(true)
|
||||
})
|
||||
if c.cancel {
|
||||
d.SetProducerDoneHandler("producer", nil)
|
||||
d.SetProducerDoneHandler(c.producer, nil)
|
||||
}
|
||||
d.Put(&TestMessage{producer: "producer"})
|
||||
w.Add(1)
|
||||
d.Start()
|
||||
w.Wait()
|
||||
if c.messageFinish.Load() && c.cancel {
|
||||
if c.cancel && c.messageFinish.Load() {
|
||||
t.Errorf("%s should cancel, but not", c.name)
|
||||
}
|
||||
})
|
||||
|
@ -90,6 +92,7 @@ func TestDispatcher_SetClosedHandler(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
c.handlerFinishMsgCount = &atomic.Int64{}
|
||||
w := new(sync.WaitGroup)
|
||||
|
@ -97,7 +100,7 @@ func TestDispatcher_SetClosedHandler(t *testing.T) {
|
|||
time.Sleep(c.msgTime)
|
||||
c.handlerFinishMsgCount.Add(1)
|
||||
})
|
||||
d.SetClosedHandler(func(dispatcher *dispatcher.Dispatcher[string, *TestMessage]) {
|
||||
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
|
||||
w.Done()
|
||||
})
|
||||
for i := 0; i < c.msgCount; i++ {
|
||||
|
@ -114,6 +117,41 @@ func TestDispatcher_SetClosedHandler(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIncrCount(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
producer string
|
||||
messageDone *atomic.Bool
|
||||
}{
|
||||
{name: "TestIncrCount_Normal", producer: "producer"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
c.messageDone = &atomic.Bool{}
|
||||
w := new(sync.WaitGroup)
|
||||
w.Add(1)
|
||||
var d *dispatcher.Dispatcher[string, *TestMessage]
|
||||
d = dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
|
||||
c.messageDone.Store(true)
|
||||
d.IncrCount(c.producer, -1)
|
||||
})
|
||||
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
|
||||
w.Done()
|
||||
})
|
||||
d.Start()
|
||||
d.IncrCount(c.producer, 1)
|
||||
d.Expel()
|
||||
d.Put(&TestMessage{producer: c.producer})
|
||||
w.Wait()
|
||||
if !c.messageDone.Load() {
|
||||
t.Errorf("%s should done, but not", c.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_Expel(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
|
@ -127,6 +165,7 @@ func TestDispatcher_Expel(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
c.handlerFinishMsgCount = &atomic.Int64{}
|
||||
w := new(sync.WaitGroup)
|
||||
|
@ -134,7 +173,7 @@ func TestDispatcher_Expel(t *testing.T) {
|
|||
time.Sleep(c.msgTime)
|
||||
c.handlerFinishMsgCount.Add(1)
|
||||
})
|
||||
d.SetClosedHandler(func(dispatcher *dispatcher.Dispatcher[string, *TestMessage]) {
|
||||
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
|
||||
w.Done()
|
||||
})
|
||||
for i := 0; i < c.msgCount; i++ {
|
||||
|
@ -163,13 +202,14 @@ func TestDispatcher_UnExpel(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
c.closed = &atomic.Bool{}
|
||||
w := new(sync.WaitGroup)
|
||||
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
|
||||
w.Done()
|
||||
})
|
||||
d.SetClosedHandler(func(dispatcher *dispatcher.Dispatcher[string, *TestMessage]) {
|
||||
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
|
||||
c.closed.Store(true)
|
||||
})
|
||||
d.Put(&TestMessage{producer: "producer"})
|
||||
|
@ -188,3 +228,105 @@ func TestDispatcher_UnExpel(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_Put(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
producer string
|
||||
messageDone *atomic.Bool
|
||||
}{
|
||||
{name: "TestDispatcher_Put_Normal", producer: "producer"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
c.messageDone = &atomic.Bool{}
|
||||
w := new(sync.WaitGroup)
|
||||
w.Add(1)
|
||||
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
|
||||
c.messageDone.Store(true)
|
||||
w.Done()
|
||||
})
|
||||
d.Start()
|
||||
d.Put(&TestMessage{producer: c.producer})
|
||||
d.Expel()
|
||||
w.Wait()
|
||||
if !c.messageDone.Load() {
|
||||
t.Errorf("%s should done, but not", c.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_Start(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
producer string
|
||||
messageDone *atomic.Bool
|
||||
}{
|
||||
{name: "TestDispatcher_Start_Normal", producer: "producer"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
c.messageDone = &atomic.Bool{}
|
||||
w := new(sync.WaitGroup)
|
||||
w.Add(1)
|
||||
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
|
||||
c.messageDone.Store(true)
|
||||
w.Done()
|
||||
})
|
||||
d.Start()
|
||||
d.Put(&TestMessage{producer: c.producer})
|
||||
d.Expel()
|
||||
w.Wait()
|
||||
if !c.messageDone.Load() {
|
||||
t.Errorf("%s should done, but not", c.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_Name(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
}{
|
||||
{name: "TestDispatcher_Name_Normal"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
|
||||
if d.Name() != c.name {
|
||||
t.Errorf("%s should equal %s, but not", c.name, c.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_Closed(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
}{
|
||||
{name: "TestDispatcher_Closed_Normal"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
w := new(sync.WaitGroup)
|
||||
w.Add(1)
|
||||
d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
|
||||
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) { w.Done() })
|
||||
d.Start()
|
||||
d.Expel()
|
||||
w.Wait()
|
||||
if !d.Closed() {
|
||||
t.Errorf("%s should closed, but not", c.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,46 +1,225 @@
|
|||
package dispatcher_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/server/internal/dispatcher"
|
||||
"github.com/kercylan98/minotaur/utils/times"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestManager(t *testing.T) {
|
||||
var mgr *dispatcher.Manager[string, *TestMessage]
|
||||
var onHandler = func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
|
||||
t.Log(dispatcher.Name(), message, mgr.GetDispatcherNum())
|
||||
switch message.v {
|
||||
case 4:
|
||||
mgr.UnBindProducer("test")
|
||||
t.Log("UnBindProducer")
|
||||
case 6:
|
||||
mgr.BindProducer(message.GetProducer(), "test-dispatcher")
|
||||
t.Log("BindProducer")
|
||||
case 9:
|
||||
dispatcher.Put(&TestMessage{
|
||||
producer: "test",
|
||||
v: 10,
|
||||
})
|
||||
case 10:
|
||||
mgr.UnBindProducer("test")
|
||||
t.Log("UnBindProducer", mgr.GetDispatcherNum())
|
||||
}
|
||||
|
||||
}
|
||||
mgr = dispatcher.NewManager[string, *TestMessage](1024*16, onHandler)
|
||||
|
||||
mgr.BindProducer("test", "test-dispatcher")
|
||||
for i := 0; i < 10; i++ {
|
||||
d := mgr.GetDispatcher("test").SetClosedHandler(func(dispatcher *dispatcher.Dispatcher[string, *TestMessage]) {
|
||||
t.Log("closed")
|
||||
})
|
||||
d.Put(&TestMessage{
|
||||
producer: "test",
|
||||
v: i,
|
||||
})
|
||||
func TestNewManager(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
bufferSize int
|
||||
handler dispatcher.Handler[string, *TestMessage]
|
||||
shouldPanic bool
|
||||
}{
|
||||
{name: "TestNewManager_BufferSize0AndHandlerNil", bufferSize: 0, handler: nil, shouldPanic: true},
|
||||
{name: "TestNewManager_BufferSize0AndHandlerNotNil", bufferSize: 0, handler: func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}, shouldPanic: true},
|
||||
{name: "TestNewManager_BufferSize1AndHandlerNil", bufferSize: 1, handler: nil, shouldPanic: true},
|
||||
{name: "TestNewManager_BufferSize1AndHandlerNotNil", bufferSize: 1, handler: func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {}, shouldPanic: false},
|
||||
}
|
||||
|
||||
time.Sleep(times.Day)
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil && !c.shouldPanic {
|
||||
t.Errorf("NewManager() should not panic, but panic: %v", r)
|
||||
}
|
||||
}()
|
||||
dispatcher.NewManager[string, *TestMessage](c.bufferSize, c.handler)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_SetDispatcherClosedHandler(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
setCloseHandler bool
|
||||
}{
|
||||
{name: "TestManager_SetDispatcherClosedHandler_Set", setCloseHandler: true},
|
||||
{name: "TestManager_SetDispatcherClosedHandler_NotSet", setCloseHandler: false},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
var closed atomic.Bool
|
||||
m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
|
||||
if c.setCloseHandler {
|
||||
m.SetDispatcherClosedHandler(func(name string) {
|
||||
closed.Store(true)
|
||||
})
|
||||
}
|
||||
m.BindProducer(c.name, c.name)
|
||||
m.UnBindProducer(c.name)
|
||||
m.Wait()
|
||||
if c.setCloseHandler && !closed.Load() {
|
||||
t.Errorf("SetDispatcherClosedHandler() should be called")
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_SetDispatcherCreatedHandler(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
setCreatedHandler bool
|
||||
}{
|
||||
{name: "TestManager_SetDispatcherCreatedHandler_Set", setCreatedHandler: true},
|
||||
{name: "TestManager_SetDispatcherCreatedHandler_NotSet", setCreatedHandler: false},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
var created atomic.Bool
|
||||
m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
|
||||
if c.setCreatedHandler {
|
||||
m.SetDispatcherCreatedHandler(func(name string) {
|
||||
created.Store(true)
|
||||
})
|
||||
}
|
||||
m.BindProducer(c.name, c.name)
|
||||
m.UnBindProducer(c.name)
|
||||
m.Wait()
|
||||
if c.setCreatedHandler && !created.Load() {
|
||||
t.Errorf("SetDispatcherCreatedHandler() should be called")
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_HasDispatcher(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
bindName string
|
||||
has bool
|
||||
}{
|
||||
{name: "TestManager_HasDispatcher_Has", bindName: "TestManager_HasDispatcher_Has", has: true},
|
||||
{name: "TestManager_HasDispatcher_NotHas", bindName: "TestManager_HasDispatcher_NotHas", has: false},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
|
||||
m.BindProducer(c.bindName, c.bindName)
|
||||
var cond string
|
||||
if c.has {
|
||||
cond = c.bindName
|
||||
}
|
||||
if m.HasDispatcher(cond) != c.has {
|
||||
t.Errorf("HasDispatcher() should return %v", c.has)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_GetDispatcherNum(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
num int
|
||||
}{
|
||||
{name: "TestManager_GetDispatcherNum_N1", num: -1},
|
||||
{name: "TestManager_GetDispatcherNum_0", num: 0},
|
||||
{name: "TestManager_GetDispatcherNum_1", num: 1},
|
||||
{name: "TestManager_GetDispatcherNum_2", num: 2},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
|
||||
switch {
|
||||
case c.num <= 0:
|
||||
return
|
||||
case c.num == 1:
|
||||
if m.GetDispatcherNum() != 1 {
|
||||
t.Errorf("GetDispatcherNum() should return 1")
|
||||
}
|
||||
return
|
||||
default:
|
||||
for i := 0; i < c.num-1; i++ {
|
||||
m.BindProducer(fmt.Sprintf("%s_%d", c.name, i), fmt.Sprintf("%s_%d", c.name, i))
|
||||
}
|
||||
if m.GetDispatcherNum() != c.num {
|
||||
t.Errorf("GetDispatcherNum() should return %v", c.num)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_GetSystemDispatcher(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
}{
|
||||
{name: "TestManager_GetSystemDispatcher"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
|
||||
if m.GetSystemDispatcher() == nil {
|
||||
t.Errorf("GetSystemDispatcher() should not return nil")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_GetDispatcher(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
bindName string
|
||||
}{
|
||||
{name: "TestManager_GetDispatcher", bindName: "TestManager_GetDispatcher"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
|
||||
m.BindProducer(c.bindName, c.bindName)
|
||||
if m.GetDispatcher(c.bindName) == nil {
|
||||
t.Errorf("GetDispatcher() should not return nil")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_BindProducer(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
bindName string
|
||||
}{
|
||||
{name: "TestManager_BindProducer", bindName: "TestManager_BindProducer"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
|
||||
m.BindProducer(c.bindName, c.bindName)
|
||||
if m.GetDispatcher(c.bindName) == nil {
|
||||
t.Errorf("GetDispatcher() should not return nil")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_UnBindProducer(t *testing.T) {
|
||||
var cases = []struct {
|
||||
name string
|
||||
bindName string
|
||||
}{
|
||||
{name: "TestManager_UnBindProducer", bindName: "TestManager_UnBindProducer"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {})
|
||||
m.BindProducer(c.bindName, c.bindName)
|
||||
m.UnBindProducer(c.bindName)
|
||||
if m.GetDispatcher(c.bindName) != m.GetSystemDispatcher() {
|
||||
t.Errorf("GetDispatcher() should return SystemDispatcher")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue