diff --git a/feed_test.go b/feed_test.go index 641b67a..ffa44f6 100644 --- a/feed_test.go +++ b/feed_test.go @@ -13,7 +13,7 @@ func TestFeed_Topics(t *testing.T) { subscriberReady := make(chan struct{}, 1) defer close(subscriberReady) - onSubscribe := WithOnSubscribe(func(count int) { + onSubscribe := WithOnSubscribe(func() { subscriberReady <- struct{}{} }) diff --git a/options.go b/options.go index eb8201e..06e7b04 100644 --- a/options.go +++ b/options.go @@ -1,23 +1,29 @@ package gubgub +import ( + "sync" +) + // TopicOptions holds common options for topics. type TopicOptions struct { - // onClose is called after the Topic is closed and all messages have been delivered. + // onClose is called after the Topic is closed and all messages have been delivered. Even + // though you might call Close multiple times, topics are effectively closed only once thus + // this should be called only once. onClose func() // onSubscribe is called after a new subscriber is regitered. - onSubscribe func(count int) + onSubscribe func() } type TopicOption func(*TopicOptions) func WithOnClose(fn func()) TopicOption { return func(opts *TopicOptions) { - opts.onClose = fn + opts.onClose = sync.OnceFunc(fn) } } -func WithOnSubscribe(fn func(count int)) TopicOption { +func WithOnSubscribe(fn func()) TopicOption { return func(opts *TopicOptions) { opts.onSubscribe = fn } diff --git a/options_test.go b/options_test.go new file mode 100644 index 0000000..3ff3f72 --- /dev/null +++ b/options_test.go @@ -0,0 +1,95 @@ +package gubgub + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestWithOnClose(t *testing.T) { + type closable interface { + Close() + } + + feedback := make(chan struct{}, 1) + defer close(feedback) + onClose := WithOnClose(func() { feedback <- struct{}{} }) + + testCases := []struct { + name string + topic closable + }{ + { + name: "sync topic", + topic: NewSyncTopic[int](onClose), + }, + { + name: "async topic", + topic: NewAsyncTopic[int](onClose), + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + go tc.topic.Close() + + select { + case <-feedback: + break + + case <-testTimer(t, time.Second).C: + t.Fatalf("expected feedback by now") + } + }) + } +} + +func TestWithOnSubscribe(t *testing.T) { + const totalSub = 10 + + feedback := make(chan int, totalSub) + defer close(feedback) + + var counter int + onSubscribe := WithOnSubscribe(func() { + counter++ + feedback <- counter + }) + + testCases := []struct { + name string + topic Subscribable[int] + }{ + { + name: "sync topic", + topic: NewSyncTopic[int](onSubscribe), + }, + { + name: "async topic", + topic: NewAsyncTopic[int](onSubscribe), + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + counter = 0 + + for range totalSub { + tc.topic.Subscribe(func(i int) bool { return true }) + } + + count := 0 + timeout := testTimer(t, time.Second) + + for count < totalSub { + select { + case c := <-feedback: + count++ + assert.Equal(t, count, c, "expected %d but got %d instead", count, c) + + case <-timeout.C: + t.Fatalf("expected %d feedback items by now but only got %d", totalSub, count) + } + } + }) + } +}