diff --git a/sync.go b/sync.go index 6dd426d..1959bb0 100644 --- a/sync.go +++ b/sync.go @@ -17,11 +17,11 @@ type SyncTopic[T any] struct { subscribers []Subscriber[T] } -// NewSyncTopic creates a zero SyncTopic and return a pointer to it. +// NewSyncTopic creates a SyncTopic with the specified options. func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] { options := TopicOptions{ onClose: func() {}, - onSubscribe: func(count int) {}, + onSubscribe: func() {}, } for _, opt := range opts { @@ -33,7 +33,7 @@ func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] { } } -// Close will cause future Publish and Subscribe calls to return an error. +// Close will prevent further publishing and subscribing. func (t *SyncTopic[T]) Close() { t.closed.Store(true) t.options.onClose() @@ -63,7 +63,7 @@ func (t *SyncTopic[T]) Subscribe(fn Subscriber[T]) error { defer t.mu.Unlock() t.subscribers = append(t.subscribers, fn) - t.options.onSubscribe(len(t.subscribers)) + t.options.onSubscribe() return nil } diff --git a/sync_test.go b/sync_test.go index 368970e..ea8ab14 100644 --- a/sync_test.go +++ b/sync_test.go @@ -69,3 +69,35 @@ func TestSyncTopic_MultiPublishersMultiSubscribers(t *testing.T) { assert.Equal(t, msgCount*subCount, feedbackCounter) } + +func TestSyncTopic_ClosedTopicError(t *testing.T) { + testCases := []struct { + name string + assertFn func(*SyncTopic[int]) + }{ + { + name: "publishing returns error", + assertFn: func(topic *SyncTopic[int]) { + assert.Error(t, ErrTopicClosed, topic.Publish(1)) + }, + }, + { + name: "subscribing returns error", + assertFn: func(topic *SyncTopic[int]) { + assert.Error(t, ErrTopicClosed, topic.Subscribe(func(i int) bool { return true })) + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + feedback := make(chan struct{}, 1) + defer close(feedback) + + topic := NewSyncTopic[int]() + + topic.Close() // this should close the topic, no more messages can be published + + tc.assertFn(topic) + }) + } +}