Compare commits

..

5 Commits

Author SHA1 Message Date
570e6317c0 use Example tests to demonstrate lib capabilities
All checks were successful
Quality / Test with Coverage (push) Successful in 54s
2025-11-20 17:07:32 +00:00
b251795d9d ability to stack onClose and onSubscribe options 2025-11-20 17:07:06 +00:00
cfda8bce17 deleted the feed due to severe issues found
i couldn't find a good enough solution so decided to remove entirely
2025-11-20 17:06:09 +00:00
be2c327e8c set project go version to v1.25
All checks were successful
Quality / Test with Coverage (push) Successful in 1m14s
2025-11-20 11:47:41 +00:00
a9406cff13 add gitea workflow for quality 2025-11-20 11:46:59 +00:00
13 changed files with 225 additions and 229 deletions

View File

@@ -0,0 +1,31 @@
name: Quality
on: [push]
jobs:
test:
name: Test with Coverage
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: '1.25'
- name: Check out code
uses: actions/checkout@v2
- name: Install dependencies
run: |
go mod download
- name: Run Unit tests
run: |
go test -race -covermode atomic -coverprofile=coverage.out ./...
- name: Install goveralls
run: go install github.com/mattn/goveralls@v0.0.12
- name: Send coverage
env:
COVERALLS_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN}}
run: goveralls -coverprofile=coverage.out -service=github

View File

@@ -1,15 +0,0 @@
image: golang:latest
stages:
- test
format:
stage: test
script:
- go fmt gitlab.com/naterciom/gubgub
- go vet gitlab.com/naterciom/gubgub
- go test -race -cover gitlab.com/naterciom/gubgub
coverage: '/coverage: \d+.\d+% of statements/'

View File

@@ -71,11 +71,16 @@ GubGub offers 2 kinds of topics:
* **AsyncTopic** - Publishing schedules the message to be eventually delivered. * **AsyncTopic** - Publishing schedules the message to be eventually delivered.
Subscribing schedules a subscriber to be eventually registered. Subscribing schedules a subscriber to be eventually registered.
Only message delivery is guaranteed. Message delivery is guaranteed but not the order.
The type of topic does not relate to how messages are actually delivered. The type of topic does not relate to how messages are actually delivered.
Currently we deliver messages sequentially (each subscriber gets the message one after the other). Currently we deliver messages sequentially (each subscriber gets the message one after the other).
## TODO
* AsyncTopic guaranteed message order delivery
* Parallel order delivery
## Benchmarks ## Benchmarks
* **SyncTopic** - Subscribers speed and number **will** have a direct impact the publishing performance. * **SyncTopic** - Subscribers speed and number **will** have a direct impact the publishing performance.

View File

@@ -23,22 +23,14 @@ type AsyncTopic[T any] struct {
// NewAsyncTopic creates an AsyncTopic. // NewAsyncTopic creates an AsyncTopic.
func NewAsyncTopic[T any](opts ...TopicOption) *AsyncTopic[T] { func NewAsyncTopic[T any](opts ...TopicOption) *AsyncTopic[T] {
options := TopicOptions{
onClose: func() {}, // Called after the Topic is closed and all messages have been delivered.
onSubscribe: func() {}, // Called everytime a new subscriber is added
}
for _, opt := range opts {
opt(&options)
}
t := AsyncTopic[T]{ t := AsyncTopic[T]{
options: options,
closed: make(chan struct{}), closed: make(chan struct{}),
publishCh: make(chan T, 1), publishCh: make(chan T, 1),
subscribeCh: make(chan Subscriber[T], 1), subscribeCh: make(chan Subscriber[T], 1),
} }
t.SetOptions(opts...)
go t.run() go t.run()
return &t return &t
@@ -68,7 +60,7 @@ func (t *AsyncTopic[T]) Close() {
func (t *AsyncTopic[T]) run() { func (t *AsyncTopic[T]) run() {
defer close(t.closed) defer close(t.closed)
defer t.options.onClose() defer t.options.TriggerClose()
var subscribers []Subscriber[T] var subscribers []Subscriber[T]
@@ -91,7 +83,7 @@ func (t *AsyncTopic[T]) run() {
} }
subscribers = append(subscribers, newCallback) subscribers = append(subscribers, newCallback)
t.options.onSubscribe() t.options.TriggerSubscribe()
case msg, more := <-t.publishCh: case msg, more := <-t.publishCh:
if !more { if !more {
@@ -138,3 +130,7 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
return nil return nil
} }
func (t *AsyncTopic[T]) SetOptions(opts ...TopicOption) {
t.options.Apply(opts...)
}

View File

@@ -7,6 +7,8 @@ type Subscriber[T any] func(T) bool
type Topic[T any] interface { type Topic[T any] interface {
Publishable[T] Publishable[T]
Subscribable[T] Subscribable[T]
OptionsSetter
Closer
} }
type Publishable[T any] interface { type Publishable[T any] interface {
@@ -16,3 +18,11 @@ type Publishable[T any] interface {
type Subscribable[T any] interface { type Subscribable[T any] interface {
Subscribe(Subscriber[T]) error Subscribe(Subscriber[T]) error
} }
type OptionsSetter interface {
SetOptions(...TopicOption)
}
type Closer interface {
Close()
}

View File

@@ -1,77 +0,0 @@
package main
import (
"bufio"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/nmoniz/gubgub"
)
func main() {
topic := gubgub.NewAsyncTopic[string]()
defer topic.Close()
err := topic.Subscribe(gubgub.Forever(UpperCaser))
if err != nil {
log.Fatal(err)
}
err = topic.Subscribe(Countdown(3))
if err != nil {
log.Fatal(err)
}
err = topic.Subscribe(gubgub.Buffered(gubgub.Forever(Slow)))
if err != nil {
log.Fatal(err)
}
go func() {
feed, err := gubgub.Feed(topic, false)
if err != nil {
log.Fatal(err)
}
for s := range feed {
log.Printf("ForRange: %s", s)
}
}()
fmt.Printf("Use 'Ctrl+C' to exit! Type a message followed by 'Enter' to publish it:\n")
scanner := bufio.NewScanner(os.Stdin)
for {
scanner.Scan()
err := scanner.Err()
if err != nil {
log.Fatal(err)
}
topic.Publish(scanner.Text())
}
}
func UpperCaser(input string) {
log.Printf("UpperCaser: %s", strings.ToUpper(input))
}
func Countdown(count int) gubgub.Subscriber[string] {
return func(_ string) bool {
count--
if count > 0 {
log.Printf("Countdown: %d...", count)
return true
} else {
log.Print("Countdown: I'm out!")
return false
}
}
}
func Slow(input string) {
time.Sleep(time.Second * 3)
log.Printf("Slow: %s", input)
}

76
examples_test.go Normal file
View File

@@ -0,0 +1,76 @@
package gubgub_test
import (
"fmt"
"sort"
"strings"
"github.com/nmoniz/gubgub"
)
func ExampleAsyncTopic() {
latch := make(chan struct{})
topic := gubgub.NewAsyncTopic[string](gubgub.WithOnSubscribe(func() { close(latch) }))
defer topic.Close() // It's ok to close a topic multiple times
receiver := make(chan string, 3) // closed later
_ = topic.Subscribe(gubgub.Forever(func(msg string) {
receiver <- strings.ToUpper(msg)
}))
<-latch // Wait for the subscribe to register
_ = topic.Publish("aaa")
_ = topic.Publish("bbb")
_ = topic.Publish("ccc")
topic.Close() // Close topic in order to wait for all messages to be delivered
close(receiver) // Now it's safe to close the receiver channel since no more messages will be delivered
msgLst := make([]string, 0, 3)
for msg := range receiver {
msgLst = append(msgLst, msg)
}
sort.Strings(msgLst) // Because publish order is not guaranteed with the AsyncTopic (yet) we need to sort results for consistent output
for _, msg := range msgLst {
fmt.Println(msg)
}
// Output: AAA
// BBB
// CCC
}
func ExampleSyncTopic() {
topic := gubgub.NewSyncTopic[string]()
defer topic.Close()
receiver := make(chan string)
defer close(receiver)
_ = topic.Subscribe(gubgub.Buffered(gubgub.Forever(func(msg string) {
receiver <- msg
})))
_ = topic.Publish("aaa")
_ = topic.Publish("bbb")
_ = topic.Publish("ccc")
topic.Close()
// Notice how despite the subscriber is blocked trying to push to an unbuffered channel it doesn't
// block publishing to a SyncTopic thanks to the Buffered subscriber. Messages are considered
// delivered even if they are in the buffer which is why Close returns.
for range 3 {
fmt.Println(<-receiver)
}
// Output: aaa
// bbb
// ccc
}

42
feed.go
View File

@@ -1,42 +0,0 @@
package gubgub
import (
"iter"
)
// Feed allows the usage of for/range loop to consume future published messages.
// The supporting subscriber will eventually be discarded after you exit the for loop.
func Feed[T any](t Subscribable[T], buffered bool) (iter.Seq[T], error) {
feed := make(chan T) // closed by the subscriber
unsubscribe := make(chan struct{}) // closed by the iterator
subscriber := func(msg T) bool {
select {
case feed <- msg:
return true
case <-unsubscribe:
close(feed)
return false
}
}
if buffered {
subscriber = Buffered(subscriber)
}
err := t.Subscribe(subscriber)
if err != nil {
return nil, err
}
// Iterator
return func(yield func(T) bool) {
defer close(unsubscribe)
for msg := range feed {
if !yield(msg) {
return
}
}
}, nil
}

View File

@@ -1,64 +0,0 @@
package gubgub
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestFeed_Topics(t *testing.T) {
const msgCount = 10
subscriberReady := make(chan struct{}, 1)
defer close(subscriberReady)
onSubscribe := WithOnSubscribe(func() {
subscriberReady <- struct{}{}
})
testCases := []struct {
name string
topic Topic[int]
}{
{
name: "sync topic",
topic: NewSyncTopic[int](onSubscribe),
},
{
name: "async topic",
topic: NewAsyncTopic[int](onSubscribe),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
feed, err := Feed(tc.topic, false)
require.NoError(t, err)
feedback := make(chan int)
go func() {
for i := range feed {
feedback <- i
}
}()
go func() {
<-subscriberReady
for i := range msgCount {
require.NoError(t, tc.topic.Publish(i))
}
}()
var counter int
timeout := testTimer(t, time.Second)
for counter < msgCount {
select {
case <-feedback:
counter++
case <-timeout.C:
t.Fatalf("expected %d feedback values by now but only got %d", msgCount, counter)
}
}
})
}
}

2
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/nmoniz/gubgub module github.com/nmoniz/gubgub
go 1.24 go 1.25
require github.com/stretchr/testify v1.9.0 require github.com/stretchr/testify v1.9.0

View File

@@ -1,11 +1,11 @@
package gubgub package gubgub
import ( import "sync"
"sync"
)
// TopicOptions holds common options for topics. // TopicOptions holds common options for topics.
type TopicOptions struct { type TopicOptions struct {
mu sync.Mutex
// onClose is called after the Topic is closed and all messages have been delivered. Even // onClose is called after the Topic is closed and all messages have been delivered. Even
// though you might call Close multiple times, topics are effectively closed only once thus // though you might call Close multiple times, topics are effectively closed only once thus
// this should be called only once. // this should be called only once.
@@ -15,16 +15,63 @@ type TopicOptions struct {
onSubscribe func() onSubscribe func()
} }
func (to *TopicOptions) TriggerClose() {
to.mu.Lock()
defer to.mu.Unlock()
if to.onClose == nil {
return
}
to.onClose()
}
func (to *TopicOptions) TriggerSubscribe() {
to.mu.Lock()
defer to.mu.Unlock()
if to.onSubscribe == nil {
return
}
to.onSubscribe()
}
func (to *TopicOptions) Apply(opts ...TopicOption) {
to.mu.Lock()
defer to.mu.Unlock()
for _, opt := range opts {
opt(to)
}
}
type TopicOption func(*TopicOptions) type TopicOption func(*TopicOptions)
func WithOnClose(fn func()) TopicOption { func WithOnClose(fn func()) TopicOption {
return func(opts *TopicOptions) { return func(opts *TopicOptions) {
opts.onClose = sync.OnceFunc(fn) if opts.onClose == nil {
opts.onClose = fn
} else {
oldFn := opts.onClose // preserve previous onClose handler
opts.onClose = func() {
fn()
oldFn()
}
}
} }
} }
func WithOnSubscribe(fn func()) TopicOption { func WithOnSubscribe(fn func()) TopicOption {
return func(opts *TopicOptions) { return func(opts *TopicOptions) {
if opts.onSubscribe == nil {
opts.onSubscribe = fn opts.onSubscribe = fn
} else {
oldFn := opts.onSubscribe // preserve previous onSubscribe handler
opts.onSubscribe = func() {
fn()
oldFn()
}
}
} }
} }

View File

@@ -7,6 +7,38 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestTriggerClose(t *testing.T) {
to := TopicOptions{}
var calls int
to.Apply(
WithOnClose(func() { calls++ }),
WithOnClose(func() { calls++ }),
WithOnClose(func() { calls++ }))
to.TriggerClose()
if calls != 3 {
t.Fatalf("wants 3 calls but got %d", calls)
}
}
func TestTriggerSubscribe(t *testing.T) {
to := TopicOptions{}
var calls int
to.Apply(
WithOnSubscribe(func() { calls++ }),
WithOnSubscribe(func() { calls++ }),
WithOnSubscribe(func() { calls++ }))
to.TriggerSubscribe()
if calls != 3 {
t.Fatalf("wants 3 calls but got %d", calls)
}
}
func TestWithOnClose(t *testing.T) { func TestWithOnClose(t *testing.T) {
type closable interface { type closable interface {
Close() Close()

21
sync.go
View File

@@ -19,24 +19,17 @@ type SyncTopic[T any] struct {
// NewSyncTopic creates a SyncTopic with the specified options. // NewSyncTopic creates a SyncTopic with the specified options.
func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] { func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] {
options := TopicOptions{ t := &SyncTopic[T]{}
onClose: func() {},
onSubscribe: func() {},
}
for _, opt := range opts { t.SetOptions(opts...)
opt(&options)
}
return &SyncTopic[T]{ return t
options: options,
}
} }
// Close will prevent further publishing and subscribing. // Close will prevent further publishing and subscribing.
func (t *SyncTopic[T]) Close() { func (t *SyncTopic[T]) Close() {
t.closed.Store(true) t.closed.Store(true)
t.options.onClose() t.options.TriggerClose()
} }
// Publish broadcasts a message to all subscribers. // Publish broadcasts a message to all subscribers.
@@ -63,7 +56,11 @@ func (t *SyncTopic[T]) Subscribe(fn Subscriber[T]) error {
defer t.mu.Unlock() defer t.mu.Unlock()
t.subscribers = append(t.subscribers, fn) t.subscribers = append(t.subscribers, fn)
t.options.onSubscribe() t.options.TriggerSubscribe()
return nil return nil
} }
func (t *SyncTopic[T]) SetOptions(opts ...TopicOption) {
t.options.Apply(opts...)
}