Compare commits
3 Commits
be2c327e8c
...
570e6317c0
| Author | SHA1 | Date | |
|---|---|---|---|
| 570e6317c0 | |||
| b251795d9d | |||
| cfda8bce17 |
@@ -71,11 +71,16 @@ GubGub offers 2 kinds of topics:
|
||||
|
||||
* **AsyncTopic** - Publishing schedules the message to be eventually delivered.
|
||||
Subscribing schedules a subscriber to be eventually registered.
|
||||
Only message delivery is guaranteed.
|
||||
Message delivery is guaranteed but not the order.
|
||||
|
||||
The type of topic does not relate to how messages are actually delivered.
|
||||
Currently we deliver messages sequentially (each subscriber gets the message one after the other).
|
||||
|
||||
## TODO
|
||||
|
||||
* AsyncTopic guaranteed message order delivery
|
||||
* Parallel order delivery
|
||||
|
||||
## Benchmarks
|
||||
|
||||
* **SyncTopic** - Subscribers speed and number **will** have a direct impact the publishing performance.
|
||||
|
||||
20
async.go
20
async.go
@@ -23,22 +23,14 @@ type AsyncTopic[T any] struct {
|
||||
|
||||
// NewAsyncTopic creates an AsyncTopic.
|
||||
func NewAsyncTopic[T any](opts ...TopicOption) *AsyncTopic[T] {
|
||||
options := TopicOptions{
|
||||
onClose: func() {}, // Called after the Topic is closed and all messages have been delivered.
|
||||
onSubscribe: func() {}, // Called everytime a new subscriber is added
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
|
||||
t := AsyncTopic[T]{
|
||||
options: options,
|
||||
closed: make(chan struct{}),
|
||||
publishCh: make(chan T, 1),
|
||||
subscribeCh: make(chan Subscriber[T], 1),
|
||||
}
|
||||
|
||||
t.SetOptions(opts...)
|
||||
|
||||
go t.run()
|
||||
|
||||
return &t
|
||||
@@ -68,7 +60,7 @@ func (t *AsyncTopic[T]) Close() {
|
||||
|
||||
func (t *AsyncTopic[T]) run() {
|
||||
defer close(t.closed)
|
||||
defer t.options.onClose()
|
||||
defer t.options.TriggerClose()
|
||||
|
||||
var subscribers []Subscriber[T]
|
||||
|
||||
@@ -91,7 +83,7 @@ func (t *AsyncTopic[T]) run() {
|
||||
}
|
||||
|
||||
subscribers = append(subscribers, newCallback)
|
||||
t.options.onSubscribe()
|
||||
t.options.TriggerSubscribe()
|
||||
|
||||
case msg, more := <-t.publishCh:
|
||||
if !more {
|
||||
@@ -138,3 +130,7 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *AsyncTopic[T]) SetOptions(opts ...TopicOption) {
|
||||
t.options.Apply(opts...)
|
||||
}
|
||||
|
||||
10
common.go
10
common.go
@@ -7,6 +7,8 @@ type Subscriber[T any] func(T) bool
|
||||
type Topic[T any] interface {
|
||||
Publishable[T]
|
||||
Subscribable[T]
|
||||
OptionsSetter
|
||||
Closer
|
||||
}
|
||||
|
||||
type Publishable[T any] interface {
|
||||
@@ -16,3 +18,11 @@ type Publishable[T any] interface {
|
||||
type Subscribable[T any] interface {
|
||||
Subscribe(Subscriber[T]) error
|
||||
}
|
||||
|
||||
type OptionsSetter interface {
|
||||
SetOptions(...TopicOption)
|
||||
}
|
||||
|
||||
type Closer interface {
|
||||
Close()
|
||||
}
|
||||
|
||||
@@ -1,77 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/nmoniz/gubgub"
|
||||
)
|
||||
|
||||
func main() {
|
||||
topic := gubgub.NewAsyncTopic[string]()
|
||||
defer topic.Close()
|
||||
|
||||
err := topic.Subscribe(gubgub.Forever(UpperCaser))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
err = topic.Subscribe(Countdown(3))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
err = topic.Subscribe(gubgub.Buffered(gubgub.Forever(Slow)))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
feed, err := gubgub.Feed(topic, false)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for s := range feed {
|
||||
log.Printf("ForRange: %s", s)
|
||||
}
|
||||
}()
|
||||
|
||||
fmt.Printf("Use 'Ctrl+C' to exit! Type a message followed by 'Enter' to publish it:\n")
|
||||
scanner := bufio.NewScanner(os.Stdin)
|
||||
for {
|
||||
scanner.Scan()
|
||||
err := scanner.Err()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
topic.Publish(scanner.Text())
|
||||
}
|
||||
}
|
||||
|
||||
func UpperCaser(input string) {
|
||||
log.Printf("UpperCaser: %s", strings.ToUpper(input))
|
||||
}
|
||||
|
||||
func Countdown(count int) gubgub.Subscriber[string] {
|
||||
return func(_ string) bool {
|
||||
count--
|
||||
if count > 0 {
|
||||
log.Printf("Countdown: %d...", count)
|
||||
return true
|
||||
} else {
|
||||
log.Print("Countdown: I'm out!")
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Slow(input string) {
|
||||
time.Sleep(time.Second * 3)
|
||||
log.Printf("Slow: %s", input)
|
||||
}
|
||||
76
examples_test.go
Normal file
76
examples_test.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package gubgub_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/nmoniz/gubgub"
|
||||
)
|
||||
|
||||
func ExampleAsyncTopic() {
|
||||
latch := make(chan struct{})
|
||||
topic := gubgub.NewAsyncTopic[string](gubgub.WithOnSubscribe(func() { close(latch) }))
|
||||
defer topic.Close() // It's ok to close a topic multiple times
|
||||
|
||||
receiver := make(chan string, 3) // closed later
|
||||
|
||||
_ = topic.Subscribe(gubgub.Forever(func(msg string) {
|
||||
receiver <- strings.ToUpper(msg)
|
||||
}))
|
||||
|
||||
<-latch // Wait for the subscribe to register
|
||||
|
||||
_ = topic.Publish("aaa")
|
||||
_ = topic.Publish("bbb")
|
||||
_ = topic.Publish("ccc")
|
||||
|
||||
topic.Close() // Close topic in order to wait for all messages to be delivered
|
||||
|
||||
close(receiver) // Now it's safe to close the receiver channel since no more messages will be delivered
|
||||
|
||||
msgLst := make([]string, 0, 3)
|
||||
for msg := range receiver {
|
||||
msgLst = append(msgLst, msg)
|
||||
}
|
||||
|
||||
sort.Strings(msgLst) // Because publish order is not guaranteed with the AsyncTopic (yet) we need to sort results for consistent output
|
||||
|
||||
for _, msg := range msgLst {
|
||||
fmt.Println(msg)
|
||||
}
|
||||
|
||||
// Output: AAA
|
||||
// BBB
|
||||
// CCC
|
||||
}
|
||||
|
||||
func ExampleSyncTopic() {
|
||||
topic := gubgub.NewSyncTopic[string]()
|
||||
defer topic.Close()
|
||||
|
||||
receiver := make(chan string)
|
||||
defer close(receiver)
|
||||
|
||||
_ = topic.Subscribe(gubgub.Buffered(gubgub.Forever(func(msg string) {
|
||||
receiver <- msg
|
||||
})))
|
||||
|
||||
_ = topic.Publish("aaa")
|
||||
_ = topic.Publish("bbb")
|
||||
_ = topic.Publish("ccc")
|
||||
|
||||
topic.Close()
|
||||
|
||||
// Notice how despite the subscriber is blocked trying to push to an unbuffered channel it doesn't
|
||||
// block publishing to a SyncTopic thanks to the Buffered subscriber. Messages are considered
|
||||
// delivered even if they are in the buffer which is why Close returns.
|
||||
|
||||
for range 3 {
|
||||
fmt.Println(<-receiver)
|
||||
}
|
||||
|
||||
// Output: aaa
|
||||
// bbb
|
||||
// ccc
|
||||
}
|
||||
42
feed.go
42
feed.go
@@ -1,42 +0,0 @@
|
||||
package gubgub
|
||||
|
||||
import (
|
||||
"iter"
|
||||
)
|
||||
|
||||
// Feed allows the usage of for/range loop to consume future published messages.
|
||||
// The supporting subscriber will eventually be discarded after you exit the for loop.
|
||||
func Feed[T any](t Subscribable[T], buffered bool) (iter.Seq[T], error) {
|
||||
feed := make(chan T) // closed by the subscriber
|
||||
unsubscribe := make(chan struct{}) // closed by the iterator
|
||||
|
||||
subscriber := func(msg T) bool {
|
||||
select {
|
||||
case feed <- msg:
|
||||
return true
|
||||
case <-unsubscribe:
|
||||
close(feed)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if buffered {
|
||||
subscriber = Buffered(subscriber)
|
||||
}
|
||||
|
||||
err := t.Subscribe(subscriber)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Iterator
|
||||
return func(yield func(T) bool) {
|
||||
defer close(unsubscribe)
|
||||
|
||||
for msg := range feed {
|
||||
if !yield(msg) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
64
feed_test.go
64
feed_test.go
@@ -1,64 +0,0 @@
|
||||
package gubgub
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestFeed_Topics(t *testing.T) {
|
||||
const msgCount = 10
|
||||
|
||||
subscriberReady := make(chan struct{}, 1)
|
||||
defer close(subscriberReady)
|
||||
|
||||
onSubscribe := WithOnSubscribe(func() {
|
||||
subscriberReady <- struct{}{}
|
||||
})
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
topic Topic[int]
|
||||
}{
|
||||
{
|
||||
name: "sync topic",
|
||||
topic: NewSyncTopic[int](onSubscribe),
|
||||
},
|
||||
{
|
||||
name: "async topic",
|
||||
topic: NewAsyncTopic[int](onSubscribe),
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
feed, err := Feed(tc.topic, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
feedback := make(chan int)
|
||||
go func() {
|
||||
for i := range feed {
|
||||
feedback <- i
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
<-subscriberReady
|
||||
for i := range msgCount {
|
||||
require.NoError(t, tc.topic.Publish(i))
|
||||
}
|
||||
}()
|
||||
|
||||
var counter int
|
||||
timeout := testTimer(t, time.Second)
|
||||
for counter < msgCount {
|
||||
select {
|
||||
case <-feedback:
|
||||
counter++
|
||||
case <-timeout.C:
|
||||
t.Fatalf("expected %d feedback values by now but only got %d", msgCount, counter)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
55
options.go
55
options.go
@@ -1,11 +1,11 @@
|
||||
package gubgub
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
import "sync"
|
||||
|
||||
// TopicOptions holds common options for topics.
|
||||
type TopicOptions struct {
|
||||
mu sync.Mutex
|
||||
|
||||
// onClose is called after the Topic is closed and all messages have been delivered. Even
|
||||
// though you might call Close multiple times, topics are effectively closed only once thus
|
||||
// this should be called only once.
|
||||
@@ -15,16 +15,63 @@ type TopicOptions struct {
|
||||
onSubscribe func()
|
||||
}
|
||||
|
||||
func (to *TopicOptions) TriggerClose() {
|
||||
to.mu.Lock()
|
||||
defer to.mu.Unlock()
|
||||
|
||||
if to.onClose == nil {
|
||||
return
|
||||
}
|
||||
|
||||
to.onClose()
|
||||
}
|
||||
|
||||
func (to *TopicOptions) TriggerSubscribe() {
|
||||
to.mu.Lock()
|
||||
defer to.mu.Unlock()
|
||||
|
||||
if to.onSubscribe == nil {
|
||||
return
|
||||
}
|
||||
|
||||
to.onSubscribe()
|
||||
}
|
||||
|
||||
func (to *TopicOptions) Apply(opts ...TopicOption) {
|
||||
to.mu.Lock()
|
||||
defer to.mu.Unlock()
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(to)
|
||||
}
|
||||
}
|
||||
|
||||
type TopicOption func(*TopicOptions)
|
||||
|
||||
func WithOnClose(fn func()) TopicOption {
|
||||
return func(opts *TopicOptions) {
|
||||
opts.onClose = sync.OnceFunc(fn)
|
||||
if opts.onClose == nil {
|
||||
opts.onClose = fn
|
||||
} else {
|
||||
oldFn := opts.onClose // preserve previous onClose handler
|
||||
opts.onClose = func() {
|
||||
fn()
|
||||
oldFn()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WithOnSubscribe(fn func()) TopicOption {
|
||||
return func(opts *TopicOptions) {
|
||||
if opts.onSubscribe == nil {
|
||||
opts.onSubscribe = fn
|
||||
} else {
|
||||
oldFn := opts.onSubscribe // preserve previous onSubscribe handler
|
||||
opts.onSubscribe = func() {
|
||||
fn()
|
||||
oldFn()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,38 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestTriggerClose(t *testing.T) {
|
||||
to := TopicOptions{}
|
||||
|
||||
var calls int
|
||||
to.Apply(
|
||||
WithOnClose(func() { calls++ }),
|
||||
WithOnClose(func() { calls++ }),
|
||||
WithOnClose(func() { calls++ }))
|
||||
|
||||
to.TriggerClose()
|
||||
|
||||
if calls != 3 {
|
||||
t.Fatalf("wants 3 calls but got %d", calls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTriggerSubscribe(t *testing.T) {
|
||||
to := TopicOptions{}
|
||||
|
||||
var calls int
|
||||
to.Apply(
|
||||
WithOnSubscribe(func() { calls++ }),
|
||||
WithOnSubscribe(func() { calls++ }),
|
||||
WithOnSubscribe(func() { calls++ }))
|
||||
|
||||
to.TriggerSubscribe()
|
||||
|
||||
if calls != 3 {
|
||||
t.Fatalf("wants 3 calls but got %d", calls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithOnClose(t *testing.T) {
|
||||
type closable interface {
|
||||
Close()
|
||||
|
||||
21
sync.go
21
sync.go
@@ -19,24 +19,17 @@ type SyncTopic[T any] struct {
|
||||
|
||||
// NewSyncTopic creates a SyncTopic with the specified options.
|
||||
func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] {
|
||||
options := TopicOptions{
|
||||
onClose: func() {},
|
||||
onSubscribe: func() {},
|
||||
}
|
||||
t := &SyncTopic[T]{}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
t.SetOptions(opts...)
|
||||
|
||||
return &SyncTopic[T]{
|
||||
options: options,
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
// Close will prevent further publishing and subscribing.
|
||||
func (t *SyncTopic[T]) Close() {
|
||||
t.closed.Store(true)
|
||||
t.options.onClose()
|
||||
t.options.TriggerClose()
|
||||
}
|
||||
|
||||
// Publish broadcasts a message to all subscribers.
|
||||
@@ -63,7 +56,11 @@ func (t *SyncTopic[T]) Subscribe(fn Subscriber[T]) error {
|
||||
defer t.mu.Unlock()
|
||||
|
||||
t.subscribers = append(t.subscribers, fn)
|
||||
t.options.onSubscribe()
|
||||
t.options.TriggerSubscribe()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *SyncTopic[T]) SetOptions(opts ...TopicOption) {
|
||||
t.options.Apply(opts...)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user