major refactor to accommodate feed

This commit is contained in:
2024-09-09 11:22:00 +01:00
parent 4741acc016
commit a394dab041
9 changed files with 360 additions and 223 deletions

125
async.go
View File

@@ -1,9 +1,7 @@
package gubgub package gubgub
import ( import (
"context"
"fmt" "fmt"
"iter"
"sync" "sync"
) )
@@ -15,10 +13,11 @@ import (
// that some might never actually receive any message. Subscriber registration order is also not // that some might never actually receive any message. Subscriber registration order is also not
// guaranteed. // guaranteed.
type AsyncTopic[T any] struct { type AsyncTopic[T any] struct {
options AsyncTopicOptions options TopicOptions
mu sync.RWMutex mu sync.RWMutex
closed bool closing bool
closed chan struct{}
publishCh chan T publishCh chan T
subscribeCh chan Subscriber[T] subscribeCh chan Subscriber[T]
@@ -26,8 +25,8 @@ type AsyncTopic[T any] struct {
// NewAsyncTopic creates an AsyncTopic that will be closed when the given context is cancelled. // NewAsyncTopic creates an AsyncTopic that will be closed when the given context is cancelled.
// After closed calls to Publish or Subscribe will return an error. // After closed calls to Publish or Subscribe will return an error.
func NewAsyncTopic[T any](ctx context.Context, opts ...AsyncTopicOption) *AsyncTopic[T] { func NewAsyncTopic[T any](opts ...TopicOption) *AsyncTopic[T] {
options := AsyncTopicOptions{ options := TopicOptions{
onClose: func() {}, // Called after the Topic is closed and all messages have been delivered. onClose: func() {}, // Called after the Topic is closed and all messages have been delivered.
onSubscribe: func(count int) {}, // Called everytime a new subscriber is added onSubscribe: func(count int) {}, // Called everytime a new subscriber is added
} }
@@ -38,34 +37,46 @@ func NewAsyncTopic[T any](ctx context.Context, opts ...AsyncTopicOption) *AsyncT
t := AsyncTopic[T]{ t := AsyncTopic[T]{
options: options, options: options,
closed: make(chan struct{}),
publishCh: make(chan T, 1), publishCh: make(chan T, 1),
subscribeCh: make(chan Subscriber[T], 1), subscribeCh: make(chan Subscriber[T], 1),
} }
go t.closer(ctx)
go t.run() go t.run()
return &t return &t
} }
func (t *AsyncTopic[T]) closer(ctx context.Context) { // Close terminates background go routines and prevents further publishing and subscribing. All
<-ctx.Done() // published messages are garanteed to be delivered once Close returns. This is idempotent.
func (t *AsyncTopic[T]) Close() {
t.mu.RLock()
closing := t.closing
t.mu.RUnlock()
if closing {
// It's either already closed or it's closing.
return
}
t.mu.Lock() t.mu.Lock()
t.closed = true // no more subscribing or publishing t.closing = true // no more subscribing or publishing
t.mu.Unlock() t.mu.Unlock()
close(t.publishCh) close(t.publishCh)
close(t.subscribeCh) close(t.subscribeCh)
<-t.closed
} }
func (t *AsyncTopic[T]) run() { func (t *AsyncTopic[T]) run() {
defer close(t.closed)
defer t.options.onClose() defer t.options.onClose()
var subscribers []Subscriber[T] var subscribers []Subscriber[T]
defer func() { defer func() {
// There is only one way to get here: the topic is now closed! // There is only one way to get here: the topic is now closing!
// Because both `subscribeCh` and `publishCh` channels are closed when the topic is closed // Because both `subscribeCh` and `publishCh` channels are closed when the topic is closed
// this will always eventually return. // this will always eventually return.
// This will deliver any potential queued message thus fulfilling the message delivery // This will deliver any potential queued message thus fulfilling the message delivery
@@ -99,7 +110,7 @@ func (t *AsyncTopic[T]) run() {
func (t *AsyncTopic[T]) Publish(msg T) error { func (t *AsyncTopic[T]) Publish(msg T) error {
t.mu.RLock() t.mu.RLock()
if t.closed { if t.closing {
t.mu.RUnlock() t.mu.RUnlock()
return fmt.Errorf("async topic publish: %w", ErrTopicClosed) return fmt.Errorf("async topic publish: %w", ErrTopicClosed)
} }
@@ -116,7 +127,7 @@ func (t *AsyncTopic[T]) Publish(msg T) error {
func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error { func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
t.mu.RLock() t.mu.RLock()
if t.closed { if t.closing {
t.mu.RUnlock() t.mu.RUnlock()
return fmt.Errorf("async topic subscribe: %w", ErrTopicClosed) return fmt.Errorf("async topic subscribe: %w", ErrTopicClosed)
} }
@@ -128,89 +139,3 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
return nil return nil
} }
// Feed allows the usage of for/range to consume future published messages.
// The supporting subscriber will eventually be discarded after you exit the for loop.
func (t *AsyncTopic[T]) Feed() iter.Seq[T] {
feed := make(chan T, 1) // closed by the middleman go routine
messages := make(chan T, 1) // closed by the subscriber
yieldReady := make(chan struct{}) // closed by the iterator
unsubscribe := make(chan struct{}) // closed by the iterator
t.Subscribe(func(msg T) bool {
select {
case messages <- msg:
return true
case <-unsubscribe:
close(messages)
return false
}
})
go func() {
defer close(feed)
q := make([]T, 0, 1)
waiting := false
for {
select {
case m, more := <-messages:
if !more {
return
}
if waiting {
waiting = false
feed <- m
} else {
q = append(q, m)
}
case _, more := <-yieldReady:
if !more {
return
}
if len(q) > 0 {
waiting = false
feed <- q[0]
q = q[1:]
} else {
waiting = true
}
}
}
}()
return func(yield func(T) bool) {
defer close(unsubscribe)
defer close(yieldReady)
for {
yieldReady <- struct{}{}
if !yield(<-feed) {
return
}
}
}
}
type AsyncTopicOptions struct {
onClose func()
onSubscribe func(count int)
}
type AsyncTopicOption func(*AsyncTopicOptions)
func WithOnClose(fn func()) AsyncTopicOption {
return func(opts *AsyncTopicOptions) {
opts.onClose = fn
}
}
func WithOnSubscribe(fn func(count int)) AsyncTopicOption {
return func(opts *AsyncTopicOptions) {
opts.onSubscribe = fn
}
}

View File

@@ -1,7 +1,6 @@
package gubgub package gubgub
import ( import (
"context"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@@ -10,26 +9,8 @@ import (
func BenchmarkAsyncTopic_Publish(b *testing.B) { func BenchmarkAsyncTopic_Publish(b *testing.B) {
for _, tc := range publishCases { for _, tc := range publishCases {
b.Run(tc.Name, func(b *testing.B) { b.Run(tc.Name, func(b *testing.B) {
onSubscribe, subscribersReady := withNotifyOnNthSubscriber(b, int64(tc.Count))
ctx, cancel := context.WithCancel(context.Background()) topic := newTestAsyncTopic[int](b, onSubscribe)
defer cancel()
subscribersReady := make(chan struct{}, 1)
defer close(subscribersReady)
topicClosed := make(chan struct{}, 1)
defer close(topicClosed)
topic := NewAsyncTopic[int](ctx,
WithOnSubscribe(func(count int) {
if count == tc.Count {
subscribersReady <- struct{}{}
}
}),
WithOnClose(func() {
topicClosed <- struct{}{}
}),
)
for range tc.Count { for range tc.Count {
require.NoError(b, topic.Subscribe(tc.Subscriber)) require.NoError(b, topic.Subscribe(tc.Subscriber))
@@ -45,10 +26,7 @@ func BenchmarkAsyncTopic_Publish(b *testing.B) {
b.StopTimer() b.StopTimer()
cancel() topic.Close()
// This just helps leaving as few running Go routines as possible when the next round starts
<-topicClosed
}) })
} }

View File

@@ -1,8 +1,7 @@
package gubgub package gubgub
import ( import (
"context" "sync/atomic"
"sync"
"testing" "testing"
"time" "time"
@@ -13,12 +12,8 @@ import (
func TestAsyncTopic_SinglePublisherSingleSubscriber(t *testing.T) { func TestAsyncTopic_SinglePublisherSingleSubscriber(t *testing.T) {
const msgCount = 10 const msgCount = 10
subscriberReady := make(chan struct{}, 1) onSubscribe, subscriberReady := withNotifyOnNthSubscriber(t, 1)
defer close(subscriberReady) topic := newTestAsyncTopic[int](t, onSubscribe)
topic := NewAsyncTopic[int](context.Background(), WithOnSubscribe(func(count int) {
subscriberReady <- struct{}{}
}))
feedback := make(chan struct{}, msgCount) feedback := make(chan struct{}, msgCount)
defer close(feedback) defer close(feedback)
@@ -56,17 +51,8 @@ func TestAsyncTopic_MultiPublishersMultiSubscribers(t *testing.T) {
msgCount = pubCount * 100 // total messages to publish (delivered to EACH subscriber) msgCount = pubCount * 100 // total messages to publish (delivered to EACH subscriber)
) )
ctx, cancel := context.WithTimeout(context.Background(), time.Second) onSubscribe, subscribersReady := withNotifyOnNthSubscriber(t, subCount)
defer cancel() topic := newTestAsyncTopic[int](t, onSubscribe)
subscribersReady := make(chan struct{}, 1)
defer close(subscribersReady)
topic := NewAsyncTopic[int](ctx, WithOnSubscribe(func(count int) {
if count == subCount {
subscribersReady <- struct{}{}
}
}))
expFeedbackCount := msgCount * subCount expFeedbackCount := msgCount * subCount
feedback := make(chan int, expFeedbackCount) feedback := make(chan int, expFeedbackCount)
@@ -111,15 +97,12 @@ func TestAsyncTopic_MultiPublishersMultiSubscribers(t *testing.T) {
} }
func TestAsyncTopic_WithOnClose(t *testing.T) { func TestAsyncTopic_WithOnClose(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
feedback := make(chan struct{}, 1) feedback := make(chan struct{}, 1)
defer close(feedback) defer close(feedback)
_ = NewAsyncTopic[int](ctx, WithOnClose(func() { feedback <- struct{}{} })) topic := NewAsyncTopic[int](WithOnClose(func() { feedback <- struct{}{} }))
cancel() topic.Close()
select { select {
case <-feedback: case <-feedback:
@@ -133,13 +116,10 @@ func TestAsyncTopic_WithOnClose(t *testing.T) {
func TestAsyncTopic_WithOnSubscribe(t *testing.T) { func TestAsyncTopic_WithOnSubscribe(t *testing.T) {
const totalSub = 10 const totalSub = 10
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
feedback := make(chan int, totalSub) feedback := make(chan int, totalSub)
defer close(feedback) defer close(feedback)
topic := NewAsyncTopic[int](ctx, WithOnSubscribe(func(count int) { feedback <- count })) topic := NewAsyncTopic[int](WithOnSubscribe(func(count int) { feedback <- count }))
for range totalSub { for range totalSub {
topic.Subscribe(func(i int) bool { return true }) topic.Subscribe(func(i int) bool { return true })
@@ -180,23 +160,12 @@ func TestAsyncTopic_ClosedTopicError(t *testing.T) {
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
feedback := make(chan struct{}, 1) feedback := make(chan struct{}, 1)
defer close(feedback) defer close(feedback)
topic := NewAsyncTopic[int](ctx, WithOnClose(func() { feedback <- struct{}{} })) topic := NewAsyncTopic[int]()
cancel() // this should close the topic, no more messages can be published topic.Close() // this should close the topic, no more messages can be published
select {
case <-feedback:
break
case <-testTimer(t, time.Second).C:
t.Fatalf("expected feedback by now")
}
tc.assertFn(topic) tc.assertFn(topic)
}) })
@@ -206,17 +175,8 @@ func TestAsyncTopic_ClosedTopicError(t *testing.T) {
func TestAsyncTopic_AllPublishedBeforeClosedAreDeliveredAfterClosed(t *testing.T) { func TestAsyncTopic_AllPublishedBeforeClosedAreDeliveredAfterClosed(t *testing.T) {
const msgCount = 10 const msgCount = 10
ctx, cancel := context.WithCancel(context.Background()) onSubscribe, subscriberReady := withNotifyOnNthSubscriber(t, 1)
defer cancel() topic := newTestAsyncTopic[int](t, onSubscribe)
subscriberReady := make(chan struct{}, 1)
defer close(subscriberReady)
topic := NewAsyncTopic[int](ctx,
WithOnSubscribe(func(count int) {
subscriberReady <- struct{}{}
}),
)
feedback := make(chan int) // unbuffered will cause choke point for publishers feedback := make(chan int) // unbuffered will cause choke point for publishers
defer close(feedback) defer close(feedback)
@@ -233,7 +193,7 @@ func TestAsyncTopic_AllPublishedBeforeClosedAreDeliveredAfterClosed(t *testing.T
require.NoError(t, topic.Publish(i)) require.NoError(t, topic.Publish(i))
} }
cancel() go topic.Close()
values := make(map[int]struct{}, msgCount) values := make(map[int]struct{}, msgCount)
timeout := testTimer(t, time.Second) timeout := testTimer(t, time.Second)
@@ -249,48 +209,6 @@ func TestAsyncTopic_AllPublishedBeforeClosedAreDeliveredAfterClosed(t *testing.T
} }
} }
func TestAsyncTopic_Feed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
subscriberReady := make(chan struct{}, 1)
defer close(subscriberReady)
topic := NewAsyncTopic[int](ctx,
WithOnSubscribe(func(count int) {
subscriberReady <- struct{}{}
}),
)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
seen := make(map[int]struct{})
for i := range topic.Feed() {
seen[i] = struct{}{}
if len(seen) >= 9 {
return
}
}
}()
<-subscriberReady
wg.Add(1)
go func() {
defer wg.Done()
for i := range 10 {
topic.Publish(i)
}
}()
wg.Wait()
}
func testTimer(t testing.TB, d time.Duration) *time.Timer { func testTimer(t testing.TB, d time.Duration) *time.Timer {
t.Helper() t.Helper()
@@ -301,3 +219,28 @@ func testTimer(t testing.TB, d time.Duration) *time.Timer {
return timer return timer
} }
func newTestAsyncTopic[T any](t testing.TB, opts ...TopicOption) *AsyncTopic[T] {
t.Helper()
topic := NewAsyncTopic[T](opts...)
t.Cleanup(topic.Close)
return topic
}
func withNotifyOnNthSubscriber(t testing.TB, n int64) (TopicOption, <-chan struct{}) {
t.Helper()
notify := make(chan struct{}, 1)
t.Cleanup(func() {
close(notify)
})
var counter atomic.Int64
return WithOnSubscribe(func(count int) {
c := counter.Add(1)
if c == n {
notify <- struct{}{}
}
}), notify
}

37
feed.go Normal file
View File

@@ -0,0 +1,37 @@
package gubgub
import "iter"
// Feed allows the usage of for/range to consume future published messages.
// The supporting subscriber will eventually be discarded after you exit the for loop.
func Feed[T any](t Subscribable[T], buffered bool) iter.Seq[T] {
feed := make(chan T) // closed by the subscriber
unsubscribe := make(chan struct{}) // closed by the iterator
subscriber := func(msg T) bool {
select {
case feed <- msg:
return true
case <-unsubscribe:
close(feed)
return false
}
}
if buffered {
subscriber = Buffered(subscriber)
}
t.Subscribe(subscriber)
// Iterator
return func(yield func(T) bool) {
defer close(unsubscribe)
for msg := range feed {
if !yield(msg) {
return
}
}
}
}

62
feed_test.go Normal file
View File

@@ -0,0 +1,62 @@
package gubgub
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestFeed_Topics(t *testing.T) {
const msgCount = 10
subscriberReady := make(chan struct{}, 1)
defer close(subscriberReady)
onSubscribe := WithOnSubscribe(func(count int) {
subscriberReady <- struct{}{}
})
testCases := []struct {
name string
topic Topic[int]
}{
{
name: "sync topic",
topic: NewSyncTopic[int](onSubscribe),
},
{
name: "async topic",
topic: NewAsyncTopic[int](onSubscribe),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
feedback := make(chan int)
go func() {
for i := range Feed(tc.topic, false) {
feedback <- i
}
}()
go func() {
<-subscriberReady
for i := range msgCount {
require.NoError(t, tc.topic.Publish(i))
}
}()
var counter int
timeout := testTimer(t, time.Second)
for counter < msgCount {
select {
case <-feedback:
counter++
case <-timeout.C:
t.Fatalf("expected %d feedback values by now but only got %d", msgCount, counter)
}
}
})
}
}

24
options.go Normal file
View File

@@ -0,0 +1,24 @@
package gubgub
// TopicOptions holds common options for topics.
type TopicOptions struct {
// onClose is called after the Topic is closed and all messages have been delivered.
onClose func()
// onSubscribe is called after a new subscriber is regitered.
onSubscribe func(count int)
}
type TopicOption func(*TopicOptions)
func WithOnClose(fn func()) TopicOption {
return func(opts *TopicOptions) {
opts.onClose = fn
}
}
func WithOnSubscribe(fn func(count int)) TopicOption {
return func(opts *TopicOptions) {
opts.onSubscribe = fn
}
}

40
sync.go
View File

@@ -1,21 +1,50 @@
package gubgub package gubgub
import "sync" import (
"fmt"
"sync"
"sync/atomic"
)
// SyncTopic is the simplest and most naive topic. It allows any message T to be broadcast to // SyncTopic is the simplest and most naive topic. It allows any message T to be broadcast to
// subscribers. Publishing and Subscribing happens synchronously (block). // subscribers. Publishing and Subscribing happens synchronously (block).
type SyncTopic[T any] struct { type SyncTopic[T any] struct {
options TopicOptions
closed atomic.Bool
mu sync.Mutex mu sync.Mutex
subscribers []Subscriber[T] subscribers []Subscriber[T]
} }
// NewSyncTopic creates a zero SyncTopic and return a pointer to it. // NewSyncTopic creates a zero SyncTopic and return a pointer to it.
func NewSyncTopic[T any]() *SyncTopic[T] { func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] {
return &SyncTopic[T]{} options := TopicOptions{
onClose: func() {},
onSubscribe: func(count int) {},
}
for _, opt := range opts {
opt(&options)
}
return &SyncTopic[T]{
options: options,
}
}
// Close will cause future Publish and Subscribe calls to return an error.
func (t *SyncTopic[T]) Close() {
t.closed.Store(true)
t.options.onClose()
} }
// Publish broadcasts a message to all subscribers. // Publish broadcasts a message to all subscribers.
func (t *SyncTopic[T]) Publish(msg T) error { func (t *SyncTopic[T]) Publish(msg T) error {
if t.closed.Load() {
return fmt.Errorf("sync topic publish: %w", ErrTopicClosed)
}
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
@@ -26,10 +55,15 @@ func (t *SyncTopic[T]) Publish(msg T) error {
// Subscribe adds a Subscriber func that will consume future published messages. // Subscribe adds a Subscriber func that will consume future published messages.
func (t *SyncTopic[T]) Subscribe(fn Subscriber[T]) error { func (t *SyncTopic[T]) Subscribe(fn Subscriber[T]) error {
if t.closed.Load() {
return fmt.Errorf("sync topic subscribe: %w", ErrTopicClosed)
}
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
t.subscribers = append(t.subscribers, fn) t.subscribers = append(t.subscribers, fn)
t.options.onSubscribe(len(t.subscribers))
return nil return nil
} }

View File

@@ -23,3 +23,82 @@ func Once[T any](fn func(T)) Subscriber[T] {
func NoOp[T any]() Subscriber[T] { func NoOp[T any]() Subscriber[T] {
return func(_ T) bool { return true } return func(_ T) bool { return true }
} }
// Buffered returns a subscriber that buffers messages if they can't be delivered immediately.
// There is no artificial limit to how many items can be buffered. This is bounded only by
// available memory.
// This is useful if message publishing is surge prone and message processing is slow or
// unpredictable (for example: subscriber makes network request).
// Message average processing rate must still be higher than the average message publishing rate
// otherwise it will eventually lead to memory issues. You will need to find a better strategy to
// deal with such scenario.
func Buffered[T any](subscriber Subscriber[T]) Subscriber[T] {
unsubscribe := make(chan struct{}) // closed by the worker
ready := make(chan struct{}) // closed by the worker
messages := make(chan T) // closed by the forwarder
work := make(chan T) // closed by the middleman
// Worker calls the actual subscriber. It notifies the middleman that it's ready for the next
// message via the ready channel and then reads from the work channel.
go func() {
for w := range work {
if !subscriber(w) {
close(unsubscribe)
close(ready)
return
}
ready <- struct{}{}
}
}()
// Middleman that handles buffering. When the worker notifies that it is ready for the next
// message it will check if there is buffered messages and push the next one immediately or
// else push it when the next message arrives.
go func() {
defer close(work)
idling := true // so that the first message can go straight to the consumer
q := make([]T, 0, 1)
for {
select {
case msg, more := <-messages:
if !more {
return
}
if idling {
idling = false
work <- msg
} else {
q = append(q, msg)
}
case _, more := <-ready:
if !more {
return
}
if len(q) > 0 {
work <- q[0]
q = q[1:]
} else {
idling = true
}
}
}
}()
// forwarder just sends messages to the middleman or quits.
return func(msg T) bool {
select {
case messages <- msg:
return true
case <-unsubscribe:
close(messages)
return false
}
}
}

55
wrappers_test.go Normal file
View File

@@ -0,0 +1,55 @@
package gubgub
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestBuffered_Once(t *testing.T) {
feedback := make(chan int, 1)
s := Buffered(Once(func(i int) {
feedback <- i // buffered channel means no blocking
}))
assert.True(t, s(1234))
timeout := testTimer(t, time.Second)
select {
case i := <-feedback:
assert.Equal(t, 1234, i)
case <-timeout.C:
t.Fatalf("expected feedback value by now")
}
assert.False(t, s(4321))
}
func TestBuffered_Forever(t *testing.T) {
const msgCount = 100
feedback := make(chan int)
s := Buffered(Forever(func(i int) {
feedback <- i // unbuffered channel creates choke point (blocks) to force buffering
}))
for i := range msgCount {
assert.True(t, s(i))
}
timeout := testTimer(t, time.Second)
var count int
for count < msgCount {
select {
case <-feedback:
count++
case <-timeout.C:
t.Fatalf("expected %d feedback values by now", msgCount)
}
}
}