improved delivery tests

This commit is contained in:
2024-08-22 15:47:14 +01:00
parent 7fd9c030c0
commit af2003fa5e
5 changed files with 37 additions and 26 deletions

View File

@@ -69,7 +69,6 @@ func (t *AsyncTopic[T]) run() {
select { select {
case newCallback, more := <-t.subscribeCh: case newCallback, more := <-t.subscribeCh:
if !more { if !more {
// Ignore subscribeCh close. The publishCh will dictate when to exit this loop.
drainedSubscribe = true drainedSubscribe = true
break break
} }
@@ -79,7 +78,6 @@ func (t *AsyncTopic[T]) run() {
case msg, more := <-t.publishCh: case msg, more := <-t.publishCh:
if !more { if !more {
// No more published messages, promise was fulfilled and we can return
drainedPublish = true drainedPublish = true
break break
} }
@@ -123,7 +121,7 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
return nil return nil
} }
// Feed allows you to for/range to consume future published messages. The supporting subscriber will eventually be discarded after you exit the for loop. // Feed allows the usage of for/range to consume future published messages. The supporting subscriber will eventually be discarded after you exit the for loop.
func (t *AsyncTopic[T]) Feed() iter.Seq[T] { func (t *AsyncTopic[T]) Feed() iter.Seq[T] {
feed := make(chan T, 1) feed := make(chan T, 1)
done := make(chan struct{}) done := make(chan struct{})

View File

@@ -1,7 +1,8 @@
package gubgub package gubgub
// sequentialDelivery delivers a message to each subscriber sequentially. For performance reasons // sequentialDelivery effentiently delivers a message to each subscriber sequentially. For
// this might mutate the subscribers slice inplace. Please overwrite it with the result of this // performance reasons this might mutate the subscribers slice inplace. Please overwrite it with
// the result of this
// call. // call.
func sequentialDelivery[T any](msg T, subscribers []Subscriber[T]) []Subscriber[T] { func sequentialDelivery[T any](msg T, subscribers []Subscriber[T]) []Subscriber[T] {
last := len(subscribers) - 1 last := len(subscribers) - 1

View File

@@ -12,18 +12,18 @@ func TestSequentialDelivery(t *testing.T) {
feedback := make([]int, 0, 3) feedback := make([]int, 0, 3)
subscribers := []Subscriber[int]{ subscribers := []Subscriber[int]{
func(i int) bool { func(msg int) bool {
assert.Equalf(t, testMsg, i, "expected %d but got %d", testMsg, i) assert.Equalf(t, testMsg, msg, "expected %d but got %d", testMsg, msg)
feedback = append(feedback, 1) feedback = append(feedback, 1)
return true
},
func(i int) bool {
assert.Equalf(t, testMsg, i, "expected %d but got %d", testMsg, i)
feedback = append(feedback, 2)
return false return false
}, },
func(i int) bool { func(msg int) bool {
assert.Equalf(t, testMsg, i, "expected %d but got %d", testMsg, i) assert.Equalf(t, testMsg, msg, "expected %d but got %d", testMsg, msg)
feedback = append(feedback, 2)
return true
},
func(msg int) bool {
assert.Equalf(t, testMsg, msg, "expected %d but got %d", testMsg, msg)
feedback = append(feedback, 3) feedback = append(feedback, 3)
return true return true
}, },
@@ -38,4 +38,25 @@ func TestSequentialDelivery(t *testing.T) {
assert.Len(t, finalSubscribers, len(nextSubscribers), "expected to have the same subscribers") assert.Len(t, finalSubscribers, len(nextSubscribers), "expected to have the same subscribers")
assert.Len(t, feedback, 5, "one or more subscriber was not called") assert.Len(t, feedback, 5, "one or more subscriber was not called")
assertContainsExactlyN(t, 1, 1, feedback)
assertContainsExactlyN(t, 2, 2, feedback)
assertContainsExactlyN(t, 3, 2, feedback)
}
func assertContainsExactlyN[T comparable](t testing.TB, exp T, n int, slice []T) {
t.Helper()
var found int
for _, v := range slice {
if exp == v {
found++
}
}
if n > found {
t.Errorf("contains too few '%v': expected %d but found %d", exp, n, found)
} else if n < found {
t.Errorf("contains too many '%v': expected %d but found %d", exp, n, found)
}
} }

Binary file not shown.

15
sync.go
View File

@@ -2,8 +2,8 @@ package gubgub
import "sync" import "sync"
// SyncTopic allows any message T to be broadcast to subscribers. Publishing and Subscribing // SyncTopic is the simplest and most naive topic. It allows any message T to be broadcast to
// happens synchronously (block). // subscribers. Publishing and Subscribing happens synchronously (block).
type SyncTopic[T any] struct { type SyncTopic[T any] struct {
mu sync.Mutex mu sync.Mutex
subscribers []Subscriber[T] subscribers []Subscriber[T]
@@ -19,16 +19,7 @@ func (t *SyncTopic[T]) Publish(msg T) error {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
keepers := make([]Subscriber[T], 0, len(t.subscribers)) t.subscribers = sequentialDelivery(msg, t.subscribers)
for _, callback := range t.subscribers {
keep := callback(msg)
if keep {
keepers = append(keepers, callback)
}
}
t.subscribers = keepers
return nil return nil
} }