diff --git a/async.go b/async.go index ddaa8ea..c01b5b7 100644 --- a/async.go +++ b/async.go @@ -68,7 +68,6 @@ func (t *AsyncTopic[T]) run() { select { case newCallback, more := <-t.subscribeCh: if !more { - // Ignore subscribeCh close. The publishCh will dictate when to exit this loop. drainedSubscribe = true break } @@ -78,21 +77,11 @@ func (t *AsyncTopic[T]) run() { case msg, more := <-t.publishCh: if !more { - // No more published messages, promise was fulfilled and we can return drainedPublish = true break } - keepers := make([]Subscriber[T], 0, len(subscribers)) - - for _, callback := range subscribers { - keep := callback(msg) - if keep { - keepers = append(keepers, callback) - } - } - - subscribers = keepers + subscribers = sequentialDelivery(msg, subscribers) } } } diff --git a/async_bench_test.go b/async_bench_test.go index 907a2c5..5860408 100644 --- a/async_bench_test.go +++ b/async_bench_test.go @@ -8,7 +8,7 @@ import ( ) func BenchmarkAsyncTopic_Publish(b *testing.B) { - for _, tc := range benchTestCase { + for _, tc := range publishCases { b.Run(tc.Name, func(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/async_test.go b/async_test.go index e3e1e31..461fd67 100644 --- a/async_test.go +++ b/async_test.go @@ -55,10 +55,13 @@ func TestAsyncTopic_MultiPublishersMultiSubscribers(t *testing.T) { msgCount = pubCount * 100 // total messages to publish (delivered to EACH subscriber) ) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + subscribersReady := make(chan struct{}, 1) defer close(subscribersReady) - topic := NewAsyncTopic[int](context.Background(), WithOnSubscribe(func(count int) { + topic := NewAsyncTopic[int](ctx, WithOnSubscribe(func(count int) { if count == subCount { subscribersReady <- struct{}{} } diff --git a/bench_data_test.go b/bench_data_test.go index 43865e5..646d02a 100644 --- a/bench_data_test.go +++ b/bench_data_test.go @@ -1,12 +1,14 @@ package gubgub +import "sync/atomic" + type benchSubscriberSetup struct { Name string Count int Subscriber Subscriber[int] } -var benchTestCase = []benchSubscriberSetup{ +var publishCases = []benchSubscriberSetup{ { Name: "10 NoOp Subscribers", Count: 10, @@ -45,3 +47,44 @@ func Slow(int) bool { } return true } + +var deliveryCases = []benchSubscriberSetup{ + { + Name: "10K Subscribers 0 unsubscribe", + Count: 10000, + Subscriber: NoOp[int](), + }, + { + Name: "100K Subscribers 0 unsubscribe", + Count: 100000, + Subscriber: NoOp[int](), + }, + { + Name: "10K Subscribers 10% unsubscribe", + Count: 10000, + Subscriber: Quiter(10), + }, + { + Name: "100K Subscribers 10% unsubscribe", + Count: 100000, + Subscriber: Quiter(10), + }, + { + Name: "10K Subscribers 50% unsubscribe", + Count: 10000, + Subscriber: Quiter(2), + }, + { + Name: "100K Subscribers 50% unsubscribe", + Count: 100000, + Subscriber: Quiter(2), + }, +} + +// Quiter returns a subscriber that unsubscribes nth calls. +func Quiter(nth int64) func(_ int) bool { + var c atomic.Int64 + return func(_ int) bool { + return c.Add(1)%nth != 0 + } +} diff --git a/delivery.go b/delivery.go new file mode 100644 index 0000000..3673605 --- /dev/null +++ b/delivery.go @@ -0,0 +1,27 @@ +package gubgub + +// sequentialDelivery effentiently delivers a message to each subscriber sequentially. For +// performance reasons this might mutate the subscribers slice inplace. Please overwrite it with +// the result of this call. +func sequentialDelivery[T any](msg T, subscribers []Subscriber[T]) []Subscriber[T] { + last := len(subscribers) - 1 + next := 0 + + for next <= last { + if !subscribers[next](msg) { + for last > next && !subscribers[last](msg) { + last-- + } + + if last <= next { + break + } + + subscribers[next] = subscribers[last] + last-- + } + next++ + } + + return subscribers[:next] +} diff --git a/delivery_bench_test.go b/delivery_bench_test.go new file mode 100644 index 0000000..72e0526 --- /dev/null +++ b/delivery_bench_test.go @@ -0,0 +1,28 @@ +package gubgub + +import "testing" + +func BenchmarkSequentialDelivery(b *testing.B) { + for _, tc := range deliveryCases { + b.Run(tc.Name, func(b *testing.B) { + subscribers := make([]Subscriber[int], 0, tc.Count) + + for range tc.Count { + subscribers = append(subscribers, tc.Subscriber) + } + + b.ResetTimer() + + for i := range b.N { + b.StartTimer() + subscribers = sequentialDelivery(i, subscribers) + b.StopTimer() + + // replenish subscribers + for len(subscribers) < tc.Count { + subscribers = append(subscribers, tc.Subscriber) + } + } + }) + } +} diff --git a/delivery_test.go b/delivery_test.go new file mode 100644 index 0000000..96788c9 --- /dev/null +++ b/delivery_test.go @@ -0,0 +1,64 @@ +package gubgub + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSequentialDelivery(t *testing.T) { + const testMsg = 9786 + + feedback := make([]int, 0, 3) + + subscribers := []Subscriber[int]{ + Once(func(msg int) { + assert.Equalf(t, testMsg, msg, "expected %d but got %d", testMsg, msg) + feedback = append(feedback, 1) + }), + Once(func(msg int) { + assert.Equalf(t, testMsg, msg, "expected %d but got %d", testMsg, msg) + feedback = append(feedback, 2) + }), + Once(func(msg int) { + assert.Equalf(t, testMsg, msg, "expected %d but got %d", testMsg, msg) + feedback = append(feedback, 3) + }), + Forever(func(msg int) { + assert.Equalf(t, testMsg, msg, "expected %d but got %d", testMsg, msg) + feedback = append(feedback, 4) + }), + } + + nextSubscribers := sequentialDelivery(testMsg, subscribers) + + assert.Len(t, nextSubscribers, 1, "expected to have 1 subscriber") + assert.Len(t, feedback, 4, "one or more subscriber was not called") + + finalSubscribers := sequentialDelivery(testMsg, nextSubscribers) + + assert.Len(t, finalSubscribers, len(nextSubscribers), "expected to have the same subscribers") + assert.Len(t, feedback, 5, "one or more subscriber was not called") + + assertContainsExactlyN(t, 1, 1, feedback) + assertContainsExactlyN(t, 2, 1, feedback) + assertContainsExactlyN(t, 3, 1, feedback) + assertContainsExactlyN(t, 4, 2, feedback) +} + +func assertContainsExactlyN[T comparable](t testing.TB, exp T, n int, slice []T) { + t.Helper() + + var found int + for _, v := range slice { + if exp == v { + found++ + } + } + + if n > found { + t.Errorf("contains too few '%v': expected %d but found %d", exp, n, found) + } else if n < found { + t.Errorf("contains too many '%v': expected %d but found %d", exp, n, found) + } +} diff --git a/sync.go b/sync.go index a426543..45dce80 100644 --- a/sync.go +++ b/sync.go @@ -2,8 +2,8 @@ package gubgub import "sync" -// SyncTopic allows any message T to be broadcast to subscribers. Publishing and Subscribing -// happens synchronously (block). +// SyncTopic is the simplest and most naive topic. It allows any message T to be broadcast to +// subscribers. Publishing and Subscribing happens synchronously (block). type SyncTopic[T any] struct { mu sync.Mutex subscribers []Subscriber[T] @@ -19,16 +19,7 @@ func (t *SyncTopic[T]) Publish(msg T) error { t.mu.Lock() defer t.mu.Unlock() - keepers := make([]Subscriber[T], 0, len(t.subscribers)) - - for _, callback := range t.subscribers { - keep := callback(msg) - if keep { - keepers = append(keepers, callback) - } - } - - t.subscribers = keepers + t.subscribers = sequentialDelivery(msg, t.subscribers) return nil } diff --git a/sync_bench_test.go b/sync_bench_test.go index 1b89696..c2811e7 100644 --- a/sync_bench_test.go +++ b/sync_bench_test.go @@ -7,7 +7,7 @@ import ( ) func BenchmarkSyncTopic_Publish(b *testing.B) { - for _, tc := range benchTestCase { + for _, tc := range publishCases { b.Run(tc.Name, func(b *testing.B) { topic := NewSyncTopic[int]() diff --git a/wrappers.go b/wrappers.go index c6f2a73..944e119 100644 --- a/wrappers.go +++ b/wrappers.go @@ -1,8 +1,7 @@ package gubgub -// Forever wrapper makes it more explicit that a subscriber will never stop consuming messages. -// This helps avoiding subscribers that always return true which, depending on their size, might -// not be immediately clear. +// Forever wraps a subscriber that will never stop consuming messages. +// This helps avoiding subscribers that always return TRUE. func Forever[T any](fn func(T)) Subscriber[T] { return func(msg T) bool { fn(msg) @@ -10,7 +9,17 @@ func Forever[T any](fn func(T)) Subscriber[T] { } } -// NoOp creates a sbscriber that does absolutely nothing forever. This is mostly useful for testing. +// Once wraps a subscriber that will consume only one message. +// This helps avoiding subscribers that always return FALSE. +func Once[T any](fn func(T)) Subscriber[T] { + return func(t T) bool { + fn(t) + return false + } +} + +// NoOp creates a subscriber that does absolutely nothing forever. +// This is mostly useful for testing. func NoOp[T any]() Subscriber[T] { return func(_ T) bool { return true } }