Merge branch 'improved-delivery' into 'main'

Improved delivery of messages

See merge request naterciom/gubgub!1
This commit is contained in:
2024-08-22 16:25:16 +00:00
10 changed files with 186 additions and 32 deletions

View File

@@ -68,7 +68,6 @@ func (t *AsyncTopic[T]) run() {
select {
case newCallback, more := <-t.subscribeCh:
if !more {
// Ignore subscribeCh close. The publishCh will dictate when to exit this loop.
drainedSubscribe = true
break
}
@@ -78,21 +77,11 @@ func (t *AsyncTopic[T]) run() {
case msg, more := <-t.publishCh:
if !more {
// No more published messages, promise was fulfilled and we can return
drainedPublish = true
break
}
keepers := make([]Subscriber[T], 0, len(subscribers))
for _, callback := range subscribers {
keep := callback(msg)
if keep {
keepers = append(keepers, callback)
}
}
subscribers = keepers
subscribers = sequentialDelivery(msg, subscribers)
}
}
}

View File

@@ -8,7 +8,7 @@ import (
)
func BenchmarkAsyncTopic_Publish(b *testing.B) {
for _, tc := range benchTestCase {
for _, tc := range publishCases {
b.Run(tc.Name, func(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())

View File

@@ -55,10 +55,13 @@ func TestAsyncTopic_MultiPublishersMultiSubscribers(t *testing.T) {
msgCount = pubCount * 100 // total messages to publish (delivered to EACH subscriber)
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
subscribersReady := make(chan struct{}, 1)
defer close(subscribersReady)
topic := NewAsyncTopic[int](context.Background(), WithOnSubscribe(func(count int) {
topic := NewAsyncTopic[int](ctx, WithOnSubscribe(func(count int) {
if count == subCount {
subscribersReady <- struct{}{}
}

View File

@@ -1,12 +1,14 @@
package gubgub
import "sync/atomic"
type benchSubscriberSetup struct {
Name string
Count int
Subscriber Subscriber[int]
}
var benchTestCase = []benchSubscriberSetup{
var publishCases = []benchSubscriberSetup{
{
Name: "10 NoOp Subscribers",
Count: 10,
@@ -45,3 +47,44 @@ func Slow(int) bool {
}
return true
}
var deliveryCases = []benchSubscriberSetup{
{
Name: "10K Subscribers 0 unsubscribe",
Count: 10000,
Subscriber: NoOp[int](),
},
{
Name: "100K Subscribers 0 unsubscribe",
Count: 100000,
Subscriber: NoOp[int](),
},
{
Name: "10K Subscribers 10% unsubscribe",
Count: 10000,
Subscriber: Quiter(10),
},
{
Name: "100K Subscribers 10% unsubscribe",
Count: 100000,
Subscriber: Quiter(10),
},
{
Name: "10K Subscribers 50% unsubscribe",
Count: 10000,
Subscriber: Quiter(2),
},
{
Name: "100K Subscribers 50% unsubscribe",
Count: 100000,
Subscriber: Quiter(2),
},
}
// Quiter returns a subscriber that unsubscribes nth calls.
func Quiter(nth int64) func(_ int) bool {
var c atomic.Int64
return func(_ int) bool {
return c.Add(1)%nth != 0
}
}

27
delivery.go Normal file
View File

@@ -0,0 +1,27 @@
package gubgub
// sequentialDelivery effentiently delivers a message to each subscriber sequentially. For
// performance reasons this might mutate the subscribers slice inplace. Please overwrite it with
// the result of this call.
func sequentialDelivery[T any](msg T, subscribers []Subscriber[T]) []Subscriber[T] {
last := len(subscribers) - 1
next := 0
for next <= last {
if !subscribers[next](msg) {
for last > next && !subscribers[last](msg) {
last--
}
if last <= next {
break
}
subscribers[next] = subscribers[last]
last--
}
next++
}
return subscribers[:next]
}

28
delivery_bench_test.go Normal file
View File

@@ -0,0 +1,28 @@
package gubgub
import "testing"
func BenchmarkSequentialDelivery(b *testing.B) {
for _, tc := range deliveryCases {
b.Run(tc.Name, func(b *testing.B) {
subscribers := make([]Subscriber[int], 0, tc.Count)
for range tc.Count {
subscribers = append(subscribers, tc.Subscriber)
}
b.ResetTimer()
for i := range b.N {
b.StartTimer()
subscribers = sequentialDelivery(i, subscribers)
b.StopTimer()
// replenish subscribers
for len(subscribers) < tc.Count {
subscribers = append(subscribers, tc.Subscriber)
}
}
})
}
}

64
delivery_test.go Normal file
View File

@@ -0,0 +1,64 @@
package gubgub
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSequentialDelivery(t *testing.T) {
const testMsg = 9786
feedback := make([]int, 0, 3)
subscribers := []Subscriber[int]{
Once(func(msg int) {
assert.Equalf(t, testMsg, msg, "expected %d but got %d", testMsg, msg)
feedback = append(feedback, 1)
}),
Once(func(msg int) {
assert.Equalf(t, testMsg, msg, "expected %d but got %d", testMsg, msg)
feedback = append(feedback, 2)
}),
Once(func(msg int) {
assert.Equalf(t, testMsg, msg, "expected %d but got %d", testMsg, msg)
feedback = append(feedback, 3)
}),
Forever(func(msg int) {
assert.Equalf(t, testMsg, msg, "expected %d but got %d", testMsg, msg)
feedback = append(feedback, 4)
}),
}
nextSubscribers := sequentialDelivery(testMsg, subscribers)
assert.Len(t, nextSubscribers, 1, "expected to have 1 subscriber")
assert.Len(t, feedback, 4, "one or more subscriber was not called")
finalSubscribers := sequentialDelivery(testMsg, nextSubscribers)
assert.Len(t, finalSubscribers, len(nextSubscribers), "expected to have the same subscribers")
assert.Len(t, feedback, 5, "one or more subscriber was not called")
assertContainsExactlyN(t, 1, 1, feedback)
assertContainsExactlyN(t, 2, 1, feedback)
assertContainsExactlyN(t, 3, 1, feedback)
assertContainsExactlyN(t, 4, 2, feedback)
}
func assertContainsExactlyN[T comparable](t testing.TB, exp T, n int, slice []T) {
t.Helper()
var found int
for _, v := range slice {
if exp == v {
found++
}
}
if n > found {
t.Errorf("contains too few '%v': expected %d but found %d", exp, n, found)
} else if n < found {
t.Errorf("contains too many '%v': expected %d but found %d", exp, n, found)
}
}

15
sync.go
View File

@@ -2,8 +2,8 @@ package gubgub
import "sync"
// SyncTopic allows any message T to be broadcast to subscribers. Publishing and Subscribing
// happens synchronously (block).
// SyncTopic is the simplest and most naive topic. It allows any message T to be broadcast to
// subscribers. Publishing and Subscribing happens synchronously (block).
type SyncTopic[T any] struct {
mu sync.Mutex
subscribers []Subscriber[T]
@@ -19,16 +19,7 @@ func (t *SyncTopic[T]) Publish(msg T) error {
t.mu.Lock()
defer t.mu.Unlock()
keepers := make([]Subscriber[T], 0, len(t.subscribers))
for _, callback := range t.subscribers {
keep := callback(msg)
if keep {
keepers = append(keepers, callback)
}
}
t.subscribers = keepers
t.subscribers = sequentialDelivery(msg, t.subscribers)
return nil
}

View File

@@ -7,7 +7,7 @@ import (
)
func BenchmarkSyncTopic_Publish(b *testing.B) {
for _, tc := range benchTestCase {
for _, tc := range publishCases {
b.Run(tc.Name, func(b *testing.B) {
topic := NewSyncTopic[int]()

View File

@@ -1,8 +1,7 @@
package gubgub
// Forever wrapper makes it more explicit that a subscriber will never stop consuming messages.
// This helps avoiding subscribers that always return true which, depending on their size, might
// not be immediately clear.
// Forever wraps a subscriber that will never stop consuming messages.
// This helps avoiding subscribers that always return TRUE.
func Forever[T any](fn func(T)) Subscriber[T] {
return func(msg T) bool {
fn(msg)
@@ -10,7 +9,17 @@ func Forever[T any](fn func(T)) Subscriber[T] {
}
}
// NoOp creates a sbscriber that does absolutely nothing forever. This is mostly useful for testing.
// Once wraps a subscriber that will consume only one message.
// This helps avoiding subscribers that always return FALSE.
func Once[T any](fn func(T)) Subscriber[T] {
return func(t T) bool {
fn(t)
return false
}
}
// NoOp creates a subscriber that does absolutely nothing forever.
// This is mostly useful for testing.
func NoOp[T any]() Subscriber[T] {
return func(_ T) bool { return true }
}