AsyncTopic guarantees delivery

This commit is contained in:
2024-08-07 13:57:44 +01:00
parent 6e59540130
commit 37f444e652
2 changed files with 87 additions and 49 deletions

View File

@@ -7,8 +7,12 @@ import (
)
// AsyncTopic allows any message T to be broadcast to subscribers. Publishing as well as
// subscribing happens asynchronously. Due to the nature of async processes this cannot guarantee
// message delivery nor delivery order.
// subscribing happens asynchronously.
// This guarantees that every published message will be delivered but does NOT guarantee delivery
// order.
// In the unlikely scenario where subscribers are being queued very aggressively it is possible
// that some might never actually receive any message. Subscriber registration order is also not
// guaranteed.
type AsyncTopic[T any] struct {
options AsyncTopicOptions
@@ -20,6 +24,7 @@ type AsyncTopic[T any] struct {
}
// NewAsyncTopic creates an AsyncTopic that will be closed when the given context is cancelled.
// After closed calls to Publish or Subscribe will return an error.
func NewAsyncTopic[T any](ctx context.Context, opts ...AsyncTopicOption) *AsyncTopic[T] {
options := AsyncTopicOptions{
onClose: func() {},
@@ -33,29 +38,43 @@ func NewAsyncTopic[T any](ctx context.Context, opts ...AsyncTopicOption) *AsyncT
t := AsyncTopic[T]{
options: options,
publishCh: make(chan T, 1),
subscribeCh: make(chan Subscriber[T]),
subscribeCh: make(chan Subscriber[T], 1),
}
go t.run(ctx)
go t.closer(ctx)
go t.run()
return &t
}
func (t *AsyncTopic[T]) run(ctx context.Context) {
defer t.close()
func (t *AsyncTopic[T]) closer(ctx context.Context) {
<-ctx.Done()
t.mu.Lock()
t.closed = true // no more subscribing or publishing
t.mu.Unlock()
close(t.publishCh)
close(t.subscribeCh)
t.options.onClose()
}
func (t *AsyncTopic[T]) run() {
var subscribers []Subscriber[T]
for {
select {
case <-ctx.Done():
return
case newCallback := <-t.subscribeCh:
subscribers = append(subscribers, newCallback)
t.options.onSubscribe(len(subscribers))
case msg := <-t.publishCh:
case msg, more := <-t.publishCh:
if !more {
// No more published messages, promise was fulfilled and we can return
return
}
keepers := make([]Subscriber[T], 0, len(subscribers))
for _, callback := range subscribers {
@@ -104,37 +123,6 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
return nil
}
func (t *AsyncTopic[T]) close() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for range t.publishCh {
// drain publishCh to release all read locks
}
}()
go func() {
defer wg.Done()
for range t.subscribeCh {
// drain subscribeCh to release all read locks
}
}()
t.mu.Lock()
t.closed = true // no more subscribing or publishing
t.mu.Unlock()
close(t.publishCh)
close(t.subscribeCh)
wg.Wait()
t.options.onClose()
}
type AsyncTopicOptions struct {
onClose func()
onSubscribe func(count int)

View File

@@ -10,9 +10,9 @@ import (
)
func TestAsyncTopic_SinglePublisherSingleSubscriber(t *testing.T) {
const msgCount = 100
const msgCount = 10
subscriberReady := make(chan struct{})
subscriberReady := make(chan struct{}, 1)
defer close(subscriberReady)
topic := NewAsyncTopic[int](context.Background(), WithOnSubscribe(func(count int) {
@@ -35,12 +35,14 @@ func TestAsyncTopic_SinglePublisherSingleSubscriber(t *testing.T) {
}
count := 0
timeout := time.After(time.Second)
for count < msgCount {
select {
case <-feedback:
count++
case <-time.After(time.Second):
case <-timeout:
t.Fatalf("expected %d feedback items by now but only got %d", msgCount, count)
}
}
@@ -53,7 +55,7 @@ func TestAsyncTopic_MultiPublishersMultiSubscribers(t *testing.T) {
msgCount = pubCount * 100 // total messages to publish (delivered to EACH subscriber)
)
subscribersReady := make(chan struct{})
subscribersReady := make(chan struct{}, 1)
defer close(subscribersReady)
topic := NewAsyncTopic[int](context.Background(), WithOnSubscribe(func(count int) {
@@ -91,12 +93,14 @@ func TestAsyncTopic_MultiPublishersMultiSubscribers(t *testing.T) {
}
count := 0
timeout := time.After(time.Second)
for count != expFeedbackCount {
select {
case <-feedback:
count++
case <-time.After(time.Second):
case <-timeout:
t.Fatalf("expected %d feedback items by now but only got %d", expFeedbackCount, count)
}
}
@@ -106,7 +110,7 @@ func TestAsyncTopic_WithOnClose(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
feedback := make(chan struct{})
feedback := make(chan struct{}, 1)
defer close(feedback)
_ = NewAsyncTopic[int](ctx, WithOnClose(func() { feedback <- struct{}{} }))
@@ -138,13 +142,15 @@ func TestAsyncTopic_WithOnSubscribe(t *testing.T) {
}
count := 0
timeout := time.After(time.Second)
for count < totalSub {
select {
case c := <-feedback:
count++
assert.Equal(t, count, c, "expected %d but got %d instead", count, c)
case <-time.After(time.Second):
case <-timeout:
t.Fatalf("expected %d feedback items by now but only got %d", totalSub, count)
}
}
@@ -173,7 +179,7 @@ func TestAsyncTopic_ClosedTopicError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
feedback := make(chan struct{})
feedback := make(chan struct{}, 1)
defer close(feedback)
topic := NewAsyncTopic[int](ctx, WithOnClose(func() { feedback <- struct{}{} }))
@@ -192,3 +198,47 @@ func TestAsyncTopic_ClosedTopicError(t *testing.T) {
})
}
}
func TestAsyncTopic_AllPublishedBeforeClosedAreDeliveredAfterClosed(t *testing.T) {
const msgCount = 10
subscriberReady := make(chan struct{}, 1)
defer close(subscriberReady)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
topic := NewAsyncTopic[int](ctx, WithOnSubscribe(func(count int) {
subscriberReady <- struct{}{}
}))
feedback := make(chan int) // unbuffered will cause choke point for publishers
defer close(feedback)
err := topic.Subscribe(func(i int) bool {
feedback <- i
return true
})
require.NoError(t, err)
<-subscriberReady
for i := range msgCount {
require.NoError(t, topic.Publish(i))
}
cancel()
values := make(map[int]struct{}, msgCount)
timeout := time.After(time.Second)
for len(values) < msgCount {
select {
case f := <-feedback:
values[f] = struct{}{}
case <-timeout:
t.Fatalf("expected %d unique feedback values by now but only got %d", msgCount, len(values))
}
}
}