diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f47cb20 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.out diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..19a411c --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,13 @@ +image: golang:latest + +stages: + - test + +format: + stage: test + script: + - go fmt gitlab.com/naterciom/gubgub + - go vet gitlab.com/naterciom/gubgub + - go test -race gitlab.com/naterciom/gubgub + + diff --git a/async.go b/async.go index 6259871..a0c18e0 100644 --- a/async.go +++ b/async.go @@ -20,7 +20,8 @@ type AsyncTopic[T any] struct { // NewAsyncTopic creates a Topic that publishes messages asynchronously. func NewAsyncTopic[T any](ctx context.Context, opts ...AsyncTopicOption) *AsyncTopic[T] { options := AsyncTopicOptions{ - onClose: func() {}, + onClose: func() {}, + onSubscribe: func(count int) {}, } for _, opt := range opts { @@ -52,6 +53,7 @@ func (t *AsyncTopic[T]) run(ctx context.Context) { case newCallback := <-t.subscribeCh: subscribers = append(subscribers, newCallback) + t.options.onSubscribe(len(subscribers)) case msg := <-t.publishCh: keepers := make([]Subscriber[T], 0, len(subscribers)) @@ -88,7 +90,6 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error { if t.closed { return fmt.Errorf("async topic subscribe: %w", ErrTopicClosed) - // return nil } t.subscribeCh <- fn @@ -104,13 +105,20 @@ func (t *AsyncTopic[T]) close() { } type AsyncTopicOptions struct { - onClose func() + onClose func() + onSubscribe func(count int) } type AsyncTopicOption func(*AsyncTopicOptions) func WithOnClose(fn func()) AsyncTopicOption { - return func(a *AsyncTopicOptions) { - a.onClose = fn + return func(opts *AsyncTopicOptions) { + opts.onClose = fn + } +} + +func WithOnSubscribe(fn func(count int)) AsyncTopicOption { + return func(opts *AsyncTopicOptions) { + opts.onSubscribe = fn } } diff --git a/async_test.go b/async_test.go index e5447d3..830187d 100644 --- a/async_test.go +++ b/async_test.go @@ -107,6 +107,34 @@ func TestAsyncTopic_WithOnClose(t *testing.T) { } } +func TestAsyncTopic_WithOnSubscribe(t *testing.T) { + const totalSub = 10 + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + feedback := make(chan int, totalSub) + defer close(feedback) + + topic := NewAsyncTopic[int](ctx, WithOnSubscribe(func(count int) { feedback <- count })) + + for range totalSub { + topic.Subscribe(func(i int) bool { return true }) + } + + count := 0 + for count < totalSub { + select { + case c := <-feedback: + count++ + assert.Equal(t, count, c, "expected %d but got %d instead", count, c) + + case <-time.After(time.Second): + t.Fatalf("expected %d feedback items by now but only got %d", totalSub, count) + } + } +} + func TestAsyncTopic_SubscribeClosedTopicError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()