improved mem & cpu consumption

This commit is contained in:
2024-08-22 11:54:57 +01:00
parent 938217dd64
commit 7fd9c030c0
10 changed files with 220 additions and 15 deletions

View File

@@ -3,6 +3,7 @@ package gubgub
import (
"context"
"fmt"
"iter"
"sync"
)
@@ -83,16 +84,7 @@ func (t *AsyncTopic[T]) run() {
break
}
keepers := make([]Subscriber[T], 0, len(subscribers))
for _, callback := range subscribers {
keep := callback(msg)
if keep {
keepers = append(keepers, callback)
}
}
subscribers = keepers
subscribers = sequentialDelivery(msg, subscribers)
}
}
}
@@ -131,6 +123,32 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
return nil
}
// Feed allows you to for/range to consume future published messages. The supporting subscriber will eventually be discarded after you exit the for loop.
func (t *AsyncTopic[T]) Feed() iter.Seq[T] {
feed := make(chan T, 1)
done := make(chan struct{})
t.Subscribe(func(msg T) bool {
select {
case feed <- msg:
return true
case <-done:
close(feed)
return false
}
})
return func(yield func(T) bool) {
defer close(done)
for msg := range feed {
if !yield(msg) {
return
}
}
}
}
type AsyncTopicOptions struct {
onClose func()
onSubscribe func(count int)

View File

@@ -8,7 +8,7 @@ import (
)
func BenchmarkAsyncTopic_Publish(b *testing.B) {
for _, tc := range benchTestCase {
for _, tc := range publishCases {
b.Run(tc.Name, func(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())

View File

@@ -2,6 +2,7 @@ package gubgub
import (
"context"
"sync"
"testing"
"time"
@@ -55,10 +56,13 @@ func TestAsyncTopic_MultiPublishersMultiSubscribers(t *testing.T) {
msgCount = pubCount * 100 // total messages to publish (delivered to EACH subscriber)
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
subscribersReady := make(chan struct{}, 1)
defer close(subscribersReady)
topic := NewAsyncTopic[int](context.Background(), WithOnSubscribe(func(count int) {
topic := NewAsyncTopic[int](ctx, WithOnSubscribe(func(count int) {
if count == subCount {
subscribersReady <- struct{}{}
}
@@ -245,6 +249,50 @@ func TestAsyncTopic_AllPublishedBeforeClosedAreDeliveredAfterClosed(t *testing.T
}
}
func TestAsyncTopic_Feed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
subscriberReady := make(chan struct{}, 1)
defer close(subscriberReady)
topic := NewAsyncTopic[int](ctx,
WithOnSubscribe(func(count int) {
subscriberReady <- struct{}{}
}),
)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
seen := make(map[int]struct{})
for i := range topic.Feed() {
seen[i] = struct{}{}
if len(seen) >= 9 {
return
}
}
}()
<-subscriberReady
wg.Add(1)
go func() {
defer wg.Done()
for i := range 10 {
topic.Publish(i)
}
}()
wg.Wait()
time.Sleep(time.Second)
}
func testTimer(t testing.TB, d time.Duration) *time.Timer {
t.Helper()

View File

@@ -1,12 +1,14 @@
package gubgub
import "sync/atomic"
type benchSubscriberSetup struct {
Name string
Count int
Subscriber Subscriber[int]
}
var benchTestCase = []benchSubscriberSetup{
var publishCases = []benchSubscriberSetup{
{
Name: "10 NoOp Subscribers",
Count: 10,
@@ -45,3 +47,44 @@ func Slow(int) bool {
}
return true
}
var deliveryCases = []benchSubscriberSetup{
{
Name: "10K Subscribers 0 unsubscribe",
Count: 10000,
Subscriber: NoOp[int](),
},
{
Name: "100K Subscribers 0 unsubscribe",
Count: 100000,
Subscriber: NoOp[int](),
},
{
Name: "10K Subscribers 1% unsubscribe",
Count: 10000,
Subscriber: Quiter(100),
},
{
Name: "100K Subscribers 1% unsubscribe",
Count: 100000,
Subscriber: Quiter(100),
},
{
Name: "10K Subscribers 2% unsubscribe",
Count: 10000,
Subscriber: Quiter(50),
},
{
Name: "100K Subscribers 2% unsubscribe",
Count: 100000,
Subscriber: Quiter(50),
},
}
// Quiter returns a subscriber that unsubscribes nth calls.
func Quiter(nth int64) func(_ int) bool {
var c atomic.Int64
return func(_ int) bool {
return c.Add(1)%nth != 0
}
}

27
delivery.go Normal file
View File

@@ -0,0 +1,27 @@
package gubgub
// sequentialDelivery delivers a message to each subscriber sequentially. For performance reasons
// this might mutate the subscribers slice inplace. Please overwrite it with the result of this
// call.
func sequentialDelivery[T any](msg T, subscribers []Subscriber[T]) []Subscriber[T] {
last := len(subscribers) - 1
next := 0
for next <= last {
if !subscribers[next](msg) {
for last > next && !subscribers[last](msg) {
last--
}
if last <= next {
break
}
subscribers[next] = subscribers[last]
last--
}
next++
}
return subscribers[:next]
}

28
delivery_bench_test.go Normal file
View File

@@ -0,0 +1,28 @@
package gubgub
import "testing"
func BenchmarkSequentialDelivery(b *testing.B) {
for _, tc := range deliveryCases {
b.Run(tc.Name, func(b *testing.B) {
subscribers := make([]Subscriber[int], 0, tc.Count)
for range tc.Count {
subscribers = append(subscribers, tc.Subscriber)
}
b.ResetTimer()
for i := range b.N {
b.StartTimer()
subscribers = sequentialDelivery(i, subscribers)
b.StopTimer()
// replenish subscribers
for len(subscribers) < tc.Count {
subscribers = append(subscribers, tc.Subscriber)
}
}
})
}
}

41
delivery_test.go Normal file
View File

@@ -0,0 +1,41 @@
package gubgub
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSequentialDelivery(t *testing.T) {
const testMsg = 9786
feedback := make([]int, 0, 3)
subscribers := []Subscriber[int]{
func(i int) bool {
assert.Equalf(t, testMsg, i, "expected %d but got %d", testMsg, i)
feedback = append(feedback, 1)
return true
},
func(i int) bool {
assert.Equalf(t, testMsg, i, "expected %d but got %d", testMsg, i)
feedback = append(feedback, 2)
return false
},
func(i int) bool {
assert.Equalf(t, testMsg, i, "expected %d but got %d", testMsg, i)
feedback = append(feedback, 3)
return true
},
}
nextSubscribers := sequentialDelivery(testMsg, subscribers)
assert.Len(t, nextSubscribers, len(subscribers)-1, "expected to have one less subscriber")
assert.Len(t, feedback, 3, "one or more subscriber was not called")
finalSubscribers := sequentialDelivery(testMsg, nextSubscribers)
assert.Len(t, finalSubscribers, len(nextSubscribers), "expected to have the same subscribers")
assert.Len(t, feedback, 5, "one or more subscriber was not called")
}

2
go.mod
View File

@@ -1,6 +1,6 @@
module gitlab.com/naterciom/gubgub
go 1.22.5
go 1.23
require github.com/stretchr/testify v1.9.0

BIN
gubgub.test Executable file

Binary file not shown.

View File

@@ -7,7 +7,7 @@ import (
)
func BenchmarkSyncTopic_Publish(b *testing.B) {
for _, tc := range benchTestCase {
for _, tc := range publishCases {
b.Run(tc.Name, func(b *testing.B) {
topic := NewSyncTopic[int]()