introduced benchmarks on publishing messages
This commit is contained in:
29
README.md
29
README.md
@@ -48,3 +48,32 @@ func main() {
|
||||
<-ctx.Done()
|
||||
}
|
||||
```
|
||||
|
||||
## Topic Benchmarks
|
||||
|
||||
So far GubGub implements 2 kinds of topics:
|
||||
|
||||
* **SyncTopic** - Publishing blocks until the message was delivered.
|
||||
Subscribers speed and number **will** have a direct impact the publishing performance.
|
||||
Under the right conditions (few and fast subscribers) this is the most performant topic.
|
||||
|
||||
* **AsyncTopic** - Publishing schedules the message to be eventually delivered.
|
||||
Subscribers speed and number **will not** directly impact the publishing perfomance at the cost of some publishing overhead.
|
||||
This is generally the most scalable topic.
|
||||
|
||||
The following benchmarks are just for reference on how the number of subscribers and their speed impact the publishing performance:
|
||||
|
||||
```
|
||||
BenchmarkAsyncTopic_Publish/10_NoOp_Subscribers-8 2047338 498.7 ns/op
|
||||
BenchmarkAsyncTopic_Publish/100_NoOp_Subscribers-8 3317646 535.0 ns/op
|
||||
BenchmarkAsyncTopic_Publish/1K_NoOp_Subscribers-8 3239110 578.9 ns/op
|
||||
BenchmarkAsyncTopic_Publish/10K_NoOp_Subscribers-8 1871702 691.2 ns/op
|
||||
BenchmarkAsyncTopic_Publish/10_Slow_Subscribers-8 2615269 433.4 ns/op
|
||||
BenchmarkAsyncTopic_Publish/20_Slow_Subscribers-8 3127874 470.4 ns/op
|
||||
BenchmarkSyncTopic_Publish/10_NoOp_Subscribers-8 24740354 59.69 ns/op
|
||||
BenchmarkSyncTopic_Publish/100_NoOp_Subscribers-8 4135681 488.9 ns/op
|
||||
BenchmarkSyncTopic_Publish/1K_NoOp_Subscribers-8 474122 4320 ns/op
|
||||
BenchmarkSyncTopic_Publish/10K_NoOp_Subscribers-8 45790 35583 ns/op
|
||||
BenchmarkSyncTopic_Publish/10_Slow_Subscribers-8 357253 3393 ns/op
|
||||
BenchmarkSyncTopic_Publish/20_Slow_Subscribers-8 179725 6688 ns/op
|
||||
```
|
||||
|
||||
22
async.go
22
async.go
@@ -27,8 +27,8 @@ type AsyncTopic[T any] struct {
|
||||
// After closed calls to Publish or Subscribe will return an error.
|
||||
func NewAsyncTopic[T any](ctx context.Context, opts ...AsyncTopicOption) *AsyncTopic[T] {
|
||||
options := AsyncTopicOptions{
|
||||
onClose: func() {},
|
||||
onSubscribe: func(count int) {},
|
||||
onClose: func() {}, // Called after the Topic is closed and all messages have been delivered.
|
||||
onSubscribe: func(count int) {}, // Called everytime a new subscriber is added
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
@@ -56,23 +56,31 @@ func (t *AsyncTopic[T]) closer(ctx context.Context) {
|
||||
|
||||
close(t.publishCh)
|
||||
close(t.subscribeCh)
|
||||
|
||||
t.options.onClose()
|
||||
}
|
||||
|
||||
func (t *AsyncTopic[T]) run() {
|
||||
defer t.options.onClose()
|
||||
|
||||
var subscribers []Subscriber[T]
|
||||
|
||||
for {
|
||||
var drainedSubscribe, drainedPublish bool
|
||||
for !drainedSubscribe || !drainedPublish {
|
||||
select {
|
||||
case newCallback := <-t.subscribeCh:
|
||||
case newCallback, more := <-t.subscribeCh:
|
||||
if !more {
|
||||
// Ignore subscribeCh close. The publishCh will dictate when to exit this loop.
|
||||
drainedSubscribe = true
|
||||
break
|
||||
}
|
||||
|
||||
subscribers = append(subscribers, newCallback)
|
||||
t.options.onSubscribe(len(subscribers))
|
||||
|
||||
case msg, more := <-t.publishCh:
|
||||
if !more {
|
||||
// No more published messages, promise was fulfilled and we can return
|
||||
return
|
||||
drainedPublish = true
|
||||
break
|
||||
}
|
||||
|
||||
keepers := make([]Subscriber[T], 0, len(subscribers))
|
||||
|
||||
55
async_bench_test.go
Normal file
55
async_bench_test.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package gubgub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func BenchmarkAsyncTopic_Publish(b *testing.B) {
|
||||
for _, tc := range benchTestCase {
|
||||
b.Run(tc.Name, func(b *testing.B) {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
subscribersReady := make(chan struct{}, 1)
|
||||
defer close(subscribersReady)
|
||||
|
||||
topicClosed := make(chan struct{}, 1)
|
||||
defer close(topicClosed)
|
||||
|
||||
topic := NewAsyncTopic[int](ctx,
|
||||
WithOnSubscribe(func(count int) {
|
||||
if count == tc.Count {
|
||||
subscribersReady <- struct{}{}
|
||||
}
|
||||
}),
|
||||
WithOnClose(func() {
|
||||
topicClosed <- struct{}{}
|
||||
}),
|
||||
)
|
||||
|
||||
for range tc.Count {
|
||||
require.NoError(b, topic.Subscribe(tc.Subscriber))
|
||||
}
|
||||
|
||||
<-subscribersReady
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for i := range b.N {
|
||||
_ = topic.Publish(i)
|
||||
}
|
||||
|
||||
b.StopTimer()
|
||||
|
||||
cancel()
|
||||
|
||||
// This just helps leaving as few running Go routines as possible when the next round starts
|
||||
<-topicClosed
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
@@ -202,15 +202,17 @@ func TestAsyncTopic_ClosedTopicError(t *testing.T) {
|
||||
func TestAsyncTopic_AllPublishedBeforeClosedAreDeliveredAfterClosed(t *testing.T) {
|
||||
const msgCount = 10
|
||||
|
||||
subscriberReady := make(chan struct{}, 1)
|
||||
defer close(subscriberReady)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
topic := NewAsyncTopic[int](ctx, WithOnSubscribe(func(count int) {
|
||||
subscriberReady <- struct{}{}
|
||||
}))
|
||||
subscriberReady := make(chan struct{}, 1)
|
||||
defer close(subscriberReady)
|
||||
|
||||
topic := NewAsyncTopic[int](ctx,
|
||||
WithOnSubscribe(func(count int) {
|
||||
subscriberReady <- struct{}{}
|
||||
}),
|
||||
)
|
||||
|
||||
feedback := make(chan int) // unbuffered will cause choke point for publishers
|
||||
defer close(feedback)
|
||||
|
||||
47
bench_data_test.go
Normal file
47
bench_data_test.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package gubgub
|
||||
|
||||
type benchSubscriberSetup struct {
|
||||
Name string
|
||||
Count int
|
||||
Subscriber Subscriber[int]
|
||||
}
|
||||
|
||||
var benchTestCase = []benchSubscriberSetup{
|
||||
{
|
||||
Name: "10 NoOp Subscribers",
|
||||
Count: 10,
|
||||
Subscriber: NoOp[int](),
|
||||
},
|
||||
{
|
||||
Name: "100 NoOp Subscribers",
|
||||
Count: 100,
|
||||
Subscriber: NoOp[int](),
|
||||
},
|
||||
{
|
||||
Name: "1K NoOp Subscribers",
|
||||
Count: 1000,
|
||||
Subscriber: NoOp[int](),
|
||||
},
|
||||
{
|
||||
Name: "10K NoOp Subscribers",
|
||||
Count: 10000,
|
||||
Subscriber: NoOp[int](),
|
||||
},
|
||||
{
|
||||
Name: "10 Slow Subscribers",
|
||||
Count: 10,
|
||||
Subscriber: Slow,
|
||||
},
|
||||
{
|
||||
Name: "20 Slow Subscribers",
|
||||
Count: 20,
|
||||
Subscriber: Slow,
|
||||
},
|
||||
}
|
||||
|
||||
func Slow(int) bool {
|
||||
for i := 0; i < 1000; i++ {
|
||||
// Just count to 1000
|
||||
}
|
||||
return true
|
||||
}
|
||||
25
sync_bench_test.go
Normal file
25
sync_bench_test.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package gubgub
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func BenchmarkSyncTopic_Publish(b *testing.B) {
|
||||
for _, tc := range benchTestCase {
|
||||
b.Run(tc.Name, func(b *testing.B) {
|
||||
topic := NewSyncTopic[int]()
|
||||
|
||||
for range tc.Count {
|
||||
require.NoError(b, topic.Subscribe(tc.Subscriber))
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for i := range b.N {
|
||||
_ = topic.Publish(i)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -9,3 +9,8 @@ func Forever[T any](fn func(T)) Subscriber[T] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// NoOp creates a sbscriber that does absolutely nothing forever. This is mostly useful for testing.
|
||||
func NoOp[T any]() Subscriber[T] {
|
||||
return func(_ T) bool { return true }
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user