Compare commits

..

3 Commits

Author SHA1 Message Date
570e6317c0 use Example tests to demonstrate lib capabilities
All checks were successful
Quality / Test with Coverage (push) Successful in 54s
2025-11-20 17:07:32 +00:00
b251795d9d ability to stack onClose and onSubscribe options 2025-11-20 17:07:06 +00:00
cfda8bce17 deleted the feed due to severe issues found
i couldn't find a good enough solution so decided to remove entirely
2025-11-20 17:06:09 +00:00
10 changed files with 193 additions and 213 deletions

View File

@@ -71,11 +71,16 @@ GubGub offers 2 kinds of topics:
* **AsyncTopic** - Publishing schedules the message to be eventually delivered.
Subscribing schedules a subscriber to be eventually registered.
Only message delivery is guaranteed.
Message delivery is guaranteed but not the order.
The type of topic does not relate to how messages are actually delivered.
Currently we deliver messages sequentially (each subscriber gets the message one after the other).
## TODO
* AsyncTopic guaranteed message order delivery
* Parallel order delivery
## Benchmarks
* **SyncTopic** - Subscribers speed and number **will** have a direct impact the publishing performance.

View File

@@ -23,22 +23,14 @@ type AsyncTopic[T any] struct {
// NewAsyncTopic creates an AsyncTopic.
func NewAsyncTopic[T any](opts ...TopicOption) *AsyncTopic[T] {
options := TopicOptions{
onClose: func() {}, // Called after the Topic is closed and all messages have been delivered.
onSubscribe: func() {}, // Called everytime a new subscriber is added
}
for _, opt := range opts {
opt(&options)
}
t := AsyncTopic[T]{
options: options,
closed: make(chan struct{}),
publishCh: make(chan T, 1),
subscribeCh: make(chan Subscriber[T], 1),
}
t.SetOptions(opts...)
go t.run()
return &t
@@ -68,7 +60,7 @@ func (t *AsyncTopic[T]) Close() {
func (t *AsyncTopic[T]) run() {
defer close(t.closed)
defer t.options.onClose()
defer t.options.TriggerClose()
var subscribers []Subscriber[T]
@@ -91,7 +83,7 @@ func (t *AsyncTopic[T]) run() {
}
subscribers = append(subscribers, newCallback)
t.options.onSubscribe()
t.options.TriggerSubscribe()
case msg, more := <-t.publishCh:
if !more {
@@ -138,3 +130,7 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
return nil
}
func (t *AsyncTopic[T]) SetOptions(opts ...TopicOption) {
t.options.Apply(opts...)
}

View File

@@ -7,6 +7,8 @@ type Subscriber[T any] func(T) bool
type Topic[T any] interface {
Publishable[T]
Subscribable[T]
OptionsSetter
Closer
}
type Publishable[T any] interface {
@@ -16,3 +18,11 @@ type Publishable[T any] interface {
type Subscribable[T any] interface {
Subscribe(Subscriber[T]) error
}
type OptionsSetter interface {
SetOptions(...TopicOption)
}
type Closer interface {
Close()
}

View File

@@ -1,77 +0,0 @@
package main
import (
"bufio"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/nmoniz/gubgub"
)
func main() {
topic := gubgub.NewAsyncTopic[string]()
defer topic.Close()
err := topic.Subscribe(gubgub.Forever(UpperCaser))
if err != nil {
log.Fatal(err)
}
err = topic.Subscribe(Countdown(3))
if err != nil {
log.Fatal(err)
}
err = topic.Subscribe(gubgub.Buffered(gubgub.Forever(Slow)))
if err != nil {
log.Fatal(err)
}
go func() {
feed, err := gubgub.Feed(topic, false)
if err != nil {
log.Fatal(err)
}
for s := range feed {
log.Printf("ForRange: %s", s)
}
}()
fmt.Printf("Use 'Ctrl+C' to exit! Type a message followed by 'Enter' to publish it:\n")
scanner := bufio.NewScanner(os.Stdin)
for {
scanner.Scan()
err := scanner.Err()
if err != nil {
log.Fatal(err)
}
topic.Publish(scanner.Text())
}
}
func UpperCaser(input string) {
log.Printf("UpperCaser: %s", strings.ToUpper(input))
}
func Countdown(count int) gubgub.Subscriber[string] {
return func(_ string) bool {
count--
if count > 0 {
log.Printf("Countdown: %d...", count)
return true
} else {
log.Print("Countdown: I'm out!")
return false
}
}
}
func Slow(input string) {
time.Sleep(time.Second * 3)
log.Printf("Slow: %s", input)
}

76
examples_test.go Normal file
View File

@@ -0,0 +1,76 @@
package gubgub_test
import (
"fmt"
"sort"
"strings"
"github.com/nmoniz/gubgub"
)
func ExampleAsyncTopic() {
latch := make(chan struct{})
topic := gubgub.NewAsyncTopic[string](gubgub.WithOnSubscribe(func() { close(latch) }))
defer topic.Close() // It's ok to close a topic multiple times
receiver := make(chan string, 3) // closed later
_ = topic.Subscribe(gubgub.Forever(func(msg string) {
receiver <- strings.ToUpper(msg)
}))
<-latch // Wait for the subscribe to register
_ = topic.Publish("aaa")
_ = topic.Publish("bbb")
_ = topic.Publish("ccc")
topic.Close() // Close topic in order to wait for all messages to be delivered
close(receiver) // Now it's safe to close the receiver channel since no more messages will be delivered
msgLst := make([]string, 0, 3)
for msg := range receiver {
msgLst = append(msgLst, msg)
}
sort.Strings(msgLst) // Because publish order is not guaranteed with the AsyncTopic (yet) we need to sort results for consistent output
for _, msg := range msgLst {
fmt.Println(msg)
}
// Output: AAA
// BBB
// CCC
}
func ExampleSyncTopic() {
topic := gubgub.NewSyncTopic[string]()
defer topic.Close()
receiver := make(chan string)
defer close(receiver)
_ = topic.Subscribe(gubgub.Buffered(gubgub.Forever(func(msg string) {
receiver <- msg
})))
_ = topic.Publish("aaa")
_ = topic.Publish("bbb")
_ = topic.Publish("ccc")
topic.Close()
// Notice how despite the subscriber is blocked trying to push to an unbuffered channel it doesn't
// block publishing to a SyncTopic thanks to the Buffered subscriber. Messages are considered
// delivered even if they are in the buffer which is why Close returns.
for range 3 {
fmt.Println(<-receiver)
}
// Output: aaa
// bbb
// ccc
}

42
feed.go
View File

@@ -1,42 +0,0 @@
package gubgub
import (
"iter"
)
// Feed allows the usage of for/range loop to consume future published messages.
// The supporting subscriber will eventually be discarded after you exit the for loop.
func Feed[T any](t Subscribable[T], buffered bool) (iter.Seq[T], error) {
feed := make(chan T) // closed by the subscriber
unsubscribe := make(chan struct{}) // closed by the iterator
subscriber := func(msg T) bool {
select {
case feed <- msg:
return true
case <-unsubscribe:
close(feed)
return false
}
}
if buffered {
subscriber = Buffered(subscriber)
}
err := t.Subscribe(subscriber)
if err != nil {
return nil, err
}
// Iterator
return func(yield func(T) bool) {
defer close(unsubscribe)
for msg := range feed {
if !yield(msg) {
return
}
}
}, nil
}

View File

@@ -1,64 +0,0 @@
package gubgub
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestFeed_Topics(t *testing.T) {
const msgCount = 10
subscriberReady := make(chan struct{}, 1)
defer close(subscriberReady)
onSubscribe := WithOnSubscribe(func() {
subscriberReady <- struct{}{}
})
testCases := []struct {
name string
topic Topic[int]
}{
{
name: "sync topic",
topic: NewSyncTopic[int](onSubscribe),
},
{
name: "async topic",
topic: NewAsyncTopic[int](onSubscribe),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
feed, err := Feed(tc.topic, false)
require.NoError(t, err)
feedback := make(chan int)
go func() {
for i := range feed {
feedback <- i
}
}()
go func() {
<-subscriberReady
for i := range msgCount {
require.NoError(t, tc.topic.Publish(i))
}
}()
var counter int
timeout := testTimer(t, time.Second)
for counter < msgCount {
select {
case <-feedback:
counter++
case <-timeout.C:
t.Fatalf("expected %d feedback values by now but only got %d", msgCount, counter)
}
}
})
}
}

View File

@@ -1,11 +1,11 @@
package gubgub
import (
"sync"
)
import "sync"
// TopicOptions holds common options for topics.
type TopicOptions struct {
mu sync.Mutex
// onClose is called after the Topic is closed and all messages have been delivered. Even
// though you might call Close multiple times, topics are effectively closed only once thus
// this should be called only once.
@@ -15,16 +15,63 @@ type TopicOptions struct {
onSubscribe func()
}
func (to *TopicOptions) TriggerClose() {
to.mu.Lock()
defer to.mu.Unlock()
if to.onClose == nil {
return
}
to.onClose()
}
func (to *TopicOptions) TriggerSubscribe() {
to.mu.Lock()
defer to.mu.Unlock()
if to.onSubscribe == nil {
return
}
to.onSubscribe()
}
func (to *TopicOptions) Apply(opts ...TopicOption) {
to.mu.Lock()
defer to.mu.Unlock()
for _, opt := range opts {
opt(to)
}
}
type TopicOption func(*TopicOptions)
func WithOnClose(fn func()) TopicOption {
return func(opts *TopicOptions) {
opts.onClose = sync.OnceFunc(fn)
if opts.onClose == nil {
opts.onClose = fn
} else {
oldFn := opts.onClose // preserve previous onClose handler
opts.onClose = func() {
fn()
oldFn()
}
}
}
}
func WithOnSubscribe(fn func()) TopicOption {
return func(opts *TopicOptions) {
opts.onSubscribe = fn
if opts.onSubscribe == nil {
opts.onSubscribe = fn
} else {
oldFn := opts.onSubscribe // preserve previous onSubscribe handler
opts.onSubscribe = func() {
fn()
oldFn()
}
}
}
}

View File

@@ -7,6 +7,38 @@ import (
"github.com/stretchr/testify/assert"
)
func TestTriggerClose(t *testing.T) {
to := TopicOptions{}
var calls int
to.Apply(
WithOnClose(func() { calls++ }),
WithOnClose(func() { calls++ }),
WithOnClose(func() { calls++ }))
to.TriggerClose()
if calls != 3 {
t.Fatalf("wants 3 calls but got %d", calls)
}
}
func TestTriggerSubscribe(t *testing.T) {
to := TopicOptions{}
var calls int
to.Apply(
WithOnSubscribe(func() { calls++ }),
WithOnSubscribe(func() { calls++ }),
WithOnSubscribe(func() { calls++ }))
to.TriggerSubscribe()
if calls != 3 {
t.Fatalf("wants 3 calls but got %d", calls)
}
}
func TestWithOnClose(t *testing.T) {
type closable interface {
Close()

21
sync.go
View File

@@ -19,24 +19,17 @@ type SyncTopic[T any] struct {
// NewSyncTopic creates a SyncTopic with the specified options.
func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] {
options := TopicOptions{
onClose: func() {},
onSubscribe: func() {},
}
t := &SyncTopic[T]{}
for _, opt := range opts {
opt(&options)
}
t.SetOptions(opts...)
return &SyncTopic[T]{
options: options,
}
return t
}
// Close will prevent further publishing and subscribing.
func (t *SyncTopic[T]) Close() {
t.closed.Store(true)
t.options.onClose()
t.options.TriggerClose()
}
// Publish broadcasts a message to all subscribers.
@@ -63,7 +56,11 @@ func (t *SyncTopic[T]) Subscribe(fn Subscriber[T]) error {
defer t.mu.Unlock()
t.subscribers = append(t.subscribers, fn)
t.options.onSubscribe()
t.options.TriggerSubscribe()
return nil
}
func (t *SyncTopic[T]) SetOptions(opts ...TopicOption) {
t.options.Apply(opts...)
}