From 1186e691214afc4a39435435a8fbdfd9b6627503 Mon Sep 17 00:00:00 2001 From: Natercio Moniz Date: Tue, 6 Aug 2024 15:15:21 +0100 Subject: [PATCH] include existing logic --- async.go | 116 ++++++++++++++++++++++++++++++++++++ async_test.go | 158 ++++++++++++++++++++++++++++++++++++++++++++++++++ common.go | 18 ++++++ errors.go | 5 ++ go.mod | 8 +++ go.sum | 10 ++++ wrappers.go | 11 ++++ 7 files changed, 326 insertions(+) create mode 100644 async.go create mode 100644 async_test.go create mode 100644 common.go create mode 100644 errors.go create mode 100644 go.sum create mode 100644 wrappers.go diff --git a/async.go b/async.go new file mode 100644 index 0000000..6259871 --- /dev/null +++ b/async.go @@ -0,0 +1,116 @@ +package gubgub + +import ( + "context" + "fmt" + "sync" +) + +// AsyncTopic allows any message T to be broadcast to all subscribers. +type AsyncTopic[T any] struct { + options AsyncTopicOptions + + mu sync.RWMutex + closed bool + + publishCh chan T + subscribeCh chan Subscriber[T] +} + +// NewAsyncTopic creates a Topic that publishes messages asynchronously. +func NewAsyncTopic[T any](ctx context.Context, opts ...AsyncTopicOption) *AsyncTopic[T] { + options := AsyncTopicOptions{ + onClose: func() {}, + } + + for _, opt := range opts { + opt(&options) + } + + t := AsyncTopic[T]{ + options: options, + publishCh: make(chan T, 1), + subscribeCh: make(chan Subscriber[T]), + } + + go t.run(ctx) + + return &t +} + +func (t *AsyncTopic[T]) run(ctx context.Context) { + defer close(t.publishCh) + defer close(t.subscribeCh) + defer t.close() + + var subscribers []Subscriber[T] + + for { + select { + case <-ctx.Done(): + return + + case newCallback := <-t.subscribeCh: + subscribers = append(subscribers, newCallback) + + case msg := <-t.publishCh: + keepers := make([]Subscriber[T], 0, len(subscribers)) + + for _, callback := range subscribers { + keep := callback(msg) + if keep { + keepers = append(keepers, callback) + } + } + + subscribers = keepers + } + } +} + +// Publish broadcasts a msg to all subscribers. +func (t *AsyncTopic[T]) Publish(msg T) error { + t.mu.RLock() + defer t.mu.RUnlock() + + if t.closed { + return fmt.Errorf("async topic publish: %w", ErrTopicClosed) + } + + t.publishCh <- msg + return nil +} + +// Subscribe adds a Subscriber func that will consume future published messages. +func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error { + t.mu.RLock() + defer t.mu.RUnlock() + + if t.closed { + return fmt.Errorf("async topic subscribe: %w", ErrTopicClosed) + // return nil + } + + t.subscribeCh <- fn + return nil +} + +func (t *AsyncTopic[T]) close() { + t.mu.Lock() + t.closed = true + t.mu.Unlock() + + t.options.onClose() +} + +type AsyncTopicOptions struct { + onClose func() +} + +type AsyncTopicOption func(*AsyncTopicOptions) + +func WithOnClose(fn func()) AsyncTopicOption { + return func(a *AsyncTopicOptions) { + a.onClose = fn + } +} diff --git a/async_test.go b/async_test.go new file mode 100644 index 0000000..e5447d3 --- /dev/null +++ b/async_test.go @@ -0,0 +1,158 @@ +package gubgub + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAsyncTopic_SinglePublisherSingleSubscriber(t *testing.T) { + const msgCount = 5 + + topic := NewAsyncTopic[int](context.Background()) + + feedback := make(chan int, msgCount) + defer close(feedback) + + err := topic.Subscribe(func(i int) bool { + feedback <- i + return true + }) + require.NoError(t, err) + + for i := 0; i < msgCount; i++ { + require.NoError(t, topic.Publish(i)) + } + + count := 0 + for count < msgCount { + select { + case f := <-feedback: + assert.Equalf(t, count, f, "expected to get %d but got %d instead", count, f) + count++ + + case <-time.After(time.Second): + t.Fatalf("expected %d feedback items by now but only got %d", msgCount, count) + } + } +} + +func TestAsyncTopic_MultiPublishersMultiSubscribers(t *testing.T) { + const ( + subCount = 10 + pubCount = 10 + msgCount = pubCount * 100 // total messages to publish (delivered to EACH subscriber) + ) + + topic := NewAsyncTopic[int](context.Background()) + + expFeedbackCount := msgCount * subCount + feedback := make(chan int, expFeedbackCount) + defer close(feedback) + + for range subCount { + err := topic.Subscribe(func(i int) bool { + feedback <- i + return true + }) + require.NoError(t, err) + } + + toDeliver := make(chan int, msgCount) + for i := range msgCount { + toDeliver <- i + } + close(toDeliver) + + for range pubCount { + go func() { + for msg := range toDeliver { + require.NoError(t, topic.Publish(msg)) + } + }() + } + + count := 0 + for count != expFeedbackCount { + select { + case <-feedback: + count++ + + case <-time.After(time.Second): + t.Fatalf("expected %d feedback items by now but only got %d", expFeedbackCount, count) + } + } +} + +func TestAsyncTopic_WithOnClose(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + feedback := make(chan struct{}) + defer close(feedback) + + _ = NewAsyncTopic[int](ctx, WithOnClose(func() { feedback <- struct{}{} })) + + cancel() + + select { + case <-feedback: + break + + case <-time.After(time.Second): + t.Fatalf("expected feedback by now") + } +} + +func TestAsyncTopic_SubscribeClosedTopicError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + feedback := make(chan struct{}) + defer close(feedback) + + topic := NewAsyncTopic[int](ctx, WithOnClose(func() { feedback <- struct{}{} })) + + require.NoError(t, topic.Subscribe(func(i int) bool { return true })) + + cancel() // this should close the topic, no more subscribers should be accepted + + select { + case <-feedback: + break + + case <-time.After(time.Second): + t.Fatalf("expected feedback by now") + } + + require.Error(t, ErrTopicClosed, topic.Subscribe(func(i int) bool { return true })) +} + +func TestAsyncTopic_PublishClosedTopicError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + feedback := make(chan struct{}) + defer close(feedback) + + topic := NewAsyncTopic[int](ctx, WithOnClose(func() { feedback <- struct{}{} })) + + require.NoError(t, topic.Subscribe(func(i int) bool { return true })) + + require.NoError(t, topic.Publish(123)) + + cancel() // this should close the topic, no more messages can be published + + select { + case <-feedback: + break + + case <-time.After(time.Second): + t.Fatalf("expected feedback by now") + } + + require.Error(t, ErrTopicClosed, topic.Publish(1)) +} diff --git a/common.go b/common.go new file mode 100644 index 0000000..8f93f81 --- /dev/null +++ b/common.go @@ -0,0 +1,18 @@ +package gubgub + +// Subscriber is a func that processes a message and returns true if it should continue processing more messages. +type Subscriber[T any] func(T) bool + +// Topic is just a convenience interface you can expect all topics to implement. +type Topic[T any] interface { + Publishable[T] + Subscribable[T] +} + +type Publishable[T any] interface { + Publish(msg T) error +} + +type Subscribable[T any] interface { + Subscribe(Subscriber[T]) error +} diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..14f20d7 --- /dev/null +++ b/errors.go @@ -0,0 +1,5 @@ +package gubgub + +import "fmt" + +var ErrTopicClosed = fmt.Errorf("topic is closed") diff --git a/go.mod b/go.mod index c24dae3..bb21962 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,11 @@ module gitlab.com/gubgub go 1.22.5 + +require github.com/stretchr/testify v1.9.0 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..60ce688 --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/wrappers.go b/wrappers.go new file mode 100644 index 0000000..3a49ff1 --- /dev/null +++ b/wrappers.go @@ -0,0 +1,11 @@ +package gubgub + +// Forever wrapper makes it more explicit that a subscriber will never stop consuming messages. +// This helps avoiding subscribers that always return true which, depending on their size, might +// not be immediately clear. +func Forever[T any](fn func(T)) Subscriber[T] { + return func(msg T) bool { + fn(msg) + return true + } +}