updated docs and calls
This commit is contained in:
8
sync.go
8
sync.go
@@ -17,11 +17,11 @@ type SyncTopic[T any] struct {
|
|||||||
subscribers []Subscriber[T]
|
subscribers []Subscriber[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSyncTopic creates a zero SyncTopic and return a pointer to it.
|
// NewSyncTopic creates a SyncTopic with the specified options.
|
||||||
func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] {
|
func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] {
|
||||||
options := TopicOptions{
|
options := TopicOptions{
|
||||||
onClose: func() {},
|
onClose: func() {},
|
||||||
onSubscribe: func(count int) {},
|
onSubscribe: func() {},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
@@ -33,7 +33,7 @@ func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close will cause future Publish and Subscribe calls to return an error.
|
// Close will prevent further publishing and subscribing.
|
||||||
func (t *SyncTopic[T]) Close() {
|
func (t *SyncTopic[T]) Close() {
|
||||||
t.closed.Store(true)
|
t.closed.Store(true)
|
||||||
t.options.onClose()
|
t.options.onClose()
|
||||||
@@ -63,7 +63,7 @@ func (t *SyncTopic[T]) Subscribe(fn Subscriber[T]) error {
|
|||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
|
|
||||||
t.subscribers = append(t.subscribers, fn)
|
t.subscribers = append(t.subscribers, fn)
|
||||||
t.options.onSubscribe(len(t.subscribers))
|
t.options.onSubscribe()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
32
sync_test.go
32
sync_test.go
@@ -69,3 +69,35 @@ func TestSyncTopic_MultiPublishersMultiSubscribers(t *testing.T) {
|
|||||||
|
|
||||||
assert.Equal(t, msgCount*subCount, feedbackCounter)
|
assert.Equal(t, msgCount*subCount, feedbackCounter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncTopic_ClosedTopicError(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
assertFn func(*SyncTopic[int])
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "publishing returns error",
|
||||||
|
assertFn: func(topic *SyncTopic[int]) {
|
||||||
|
assert.Error(t, ErrTopicClosed, topic.Publish(1))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "subscribing returns error",
|
||||||
|
assertFn: func(topic *SyncTopic[int]) {
|
||||||
|
assert.Error(t, ErrTopicClosed, topic.Subscribe(func(i int) bool { return true }))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
feedback := make(chan struct{}, 1)
|
||||||
|
defer close(feedback)
|
||||||
|
|
||||||
|
topic := NewSyncTopic[int]()
|
||||||
|
|
||||||
|
topic.Close() // this should close the topic, no more messages can be published
|
||||||
|
|
||||||
|
tc.assertFn(topic)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user