Compare commits
5 Commits
34e66307de
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 570e6317c0 | |||
| b251795d9d | |||
| cfda8bce17 | |||
| be2c327e8c | |||
| a9406cff13 |
31
.gitea/workflows/quality.yml
Normal file
31
.gitea/workflows/quality.yml
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
name: Quality
|
||||||
|
on: [push]
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
name: Test with Coverage
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Set up Go
|
||||||
|
uses: actions/setup-go@v2
|
||||||
|
with:
|
||||||
|
go-version: '1.25'
|
||||||
|
|
||||||
|
- name: Check out code
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
run: |
|
||||||
|
go mod download
|
||||||
|
|
||||||
|
- name: Run Unit tests
|
||||||
|
run: |
|
||||||
|
go test -race -covermode atomic -coverprofile=coverage.out ./...
|
||||||
|
|
||||||
|
- name: Install goveralls
|
||||||
|
run: go install github.com/mattn/goveralls@v0.0.12
|
||||||
|
|
||||||
|
- name: Send coverage
|
||||||
|
env:
|
||||||
|
COVERALLS_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN}}
|
||||||
|
run: goveralls -coverprofile=coverage.out -service=github
|
||||||
|
|
||||||
@@ -1,15 +0,0 @@
|
|||||||
image: golang:latest
|
|
||||||
|
|
||||||
stages:
|
|
||||||
- test
|
|
||||||
|
|
||||||
format:
|
|
||||||
stage: test
|
|
||||||
script:
|
|
||||||
- go fmt gitlab.com/naterciom/gubgub
|
|
||||||
- go vet gitlab.com/naterciom/gubgub
|
|
||||||
- go test -race -cover gitlab.com/naterciom/gubgub
|
|
||||||
coverage: '/coverage: \d+.\d+% of statements/'
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -71,11 +71,16 @@ GubGub offers 2 kinds of topics:
|
|||||||
|
|
||||||
* **AsyncTopic** - Publishing schedules the message to be eventually delivered.
|
* **AsyncTopic** - Publishing schedules the message to be eventually delivered.
|
||||||
Subscribing schedules a subscriber to be eventually registered.
|
Subscribing schedules a subscriber to be eventually registered.
|
||||||
Only message delivery is guaranteed.
|
Message delivery is guaranteed but not the order.
|
||||||
|
|
||||||
The type of topic does not relate to how messages are actually delivered.
|
The type of topic does not relate to how messages are actually delivered.
|
||||||
Currently we deliver messages sequentially (each subscriber gets the message one after the other).
|
Currently we deliver messages sequentially (each subscriber gets the message one after the other).
|
||||||
|
|
||||||
|
## TODO
|
||||||
|
|
||||||
|
* AsyncTopic guaranteed message order delivery
|
||||||
|
* Parallel order delivery
|
||||||
|
|
||||||
## Benchmarks
|
## Benchmarks
|
||||||
|
|
||||||
* **SyncTopic** - Subscribers speed and number **will** have a direct impact the publishing performance.
|
* **SyncTopic** - Subscribers speed and number **will** have a direct impact the publishing performance.
|
||||||
|
|||||||
20
async.go
20
async.go
@@ -23,22 +23,14 @@ type AsyncTopic[T any] struct {
|
|||||||
|
|
||||||
// NewAsyncTopic creates an AsyncTopic.
|
// NewAsyncTopic creates an AsyncTopic.
|
||||||
func NewAsyncTopic[T any](opts ...TopicOption) *AsyncTopic[T] {
|
func NewAsyncTopic[T any](opts ...TopicOption) *AsyncTopic[T] {
|
||||||
options := TopicOptions{
|
|
||||||
onClose: func() {}, // Called after the Topic is closed and all messages have been delivered.
|
|
||||||
onSubscribe: func() {}, // Called everytime a new subscriber is added
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(&options)
|
|
||||||
}
|
|
||||||
|
|
||||||
t := AsyncTopic[T]{
|
t := AsyncTopic[T]{
|
||||||
options: options,
|
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
publishCh: make(chan T, 1),
|
publishCh: make(chan T, 1),
|
||||||
subscribeCh: make(chan Subscriber[T], 1),
|
subscribeCh: make(chan Subscriber[T], 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.SetOptions(opts...)
|
||||||
|
|
||||||
go t.run()
|
go t.run()
|
||||||
|
|
||||||
return &t
|
return &t
|
||||||
@@ -68,7 +60,7 @@ func (t *AsyncTopic[T]) Close() {
|
|||||||
|
|
||||||
func (t *AsyncTopic[T]) run() {
|
func (t *AsyncTopic[T]) run() {
|
||||||
defer close(t.closed)
|
defer close(t.closed)
|
||||||
defer t.options.onClose()
|
defer t.options.TriggerClose()
|
||||||
|
|
||||||
var subscribers []Subscriber[T]
|
var subscribers []Subscriber[T]
|
||||||
|
|
||||||
@@ -91,7 +83,7 @@ func (t *AsyncTopic[T]) run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
subscribers = append(subscribers, newCallback)
|
subscribers = append(subscribers, newCallback)
|
||||||
t.options.onSubscribe()
|
t.options.TriggerSubscribe()
|
||||||
|
|
||||||
case msg, more := <-t.publishCh:
|
case msg, more := <-t.publishCh:
|
||||||
if !more {
|
if !more {
|
||||||
@@ -138,3 +130,7 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *AsyncTopic[T]) SetOptions(opts ...TopicOption) {
|
||||||
|
t.options.Apply(opts...)
|
||||||
|
}
|
||||||
|
|||||||
10
common.go
10
common.go
@@ -7,6 +7,8 @@ type Subscriber[T any] func(T) bool
|
|||||||
type Topic[T any] interface {
|
type Topic[T any] interface {
|
||||||
Publishable[T]
|
Publishable[T]
|
||||||
Subscribable[T]
|
Subscribable[T]
|
||||||
|
OptionsSetter
|
||||||
|
Closer
|
||||||
}
|
}
|
||||||
|
|
||||||
type Publishable[T any] interface {
|
type Publishable[T any] interface {
|
||||||
@@ -16,3 +18,11 @@ type Publishable[T any] interface {
|
|||||||
type Subscribable[T any] interface {
|
type Subscribable[T any] interface {
|
||||||
Subscribe(Subscriber[T]) error
|
Subscribe(Subscriber[T]) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type OptionsSetter interface {
|
||||||
|
SetOptions(...TopicOption)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Closer interface {
|
||||||
|
Close()
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,77 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/nmoniz/gubgub"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
topic := gubgub.NewAsyncTopic[string]()
|
|
||||||
defer topic.Close()
|
|
||||||
|
|
||||||
err := topic.Subscribe(gubgub.Forever(UpperCaser))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = topic.Subscribe(Countdown(3))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = topic.Subscribe(gubgub.Buffered(gubgub.Forever(Slow)))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
feed, err := gubgub.Feed(topic, false)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for s := range feed {
|
|
||||||
log.Printf("ForRange: %s", s)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
fmt.Printf("Use 'Ctrl+C' to exit! Type a message followed by 'Enter' to publish it:\n")
|
|
||||||
scanner := bufio.NewScanner(os.Stdin)
|
|
||||||
for {
|
|
||||||
scanner.Scan()
|
|
||||||
err := scanner.Err()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
topic.Publish(scanner.Text())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func UpperCaser(input string) {
|
|
||||||
log.Printf("UpperCaser: %s", strings.ToUpper(input))
|
|
||||||
}
|
|
||||||
|
|
||||||
func Countdown(count int) gubgub.Subscriber[string] {
|
|
||||||
return func(_ string) bool {
|
|
||||||
count--
|
|
||||||
if count > 0 {
|
|
||||||
log.Printf("Countdown: %d...", count)
|
|
||||||
return true
|
|
||||||
} else {
|
|
||||||
log.Print("Countdown: I'm out!")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Slow(input string) {
|
|
||||||
time.Sleep(time.Second * 3)
|
|
||||||
log.Printf("Slow: %s", input)
|
|
||||||
}
|
|
||||||
76
examples_test.go
Normal file
76
examples_test.go
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
package gubgub_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/nmoniz/gubgub"
|
||||||
|
)
|
||||||
|
|
||||||
|
func ExampleAsyncTopic() {
|
||||||
|
latch := make(chan struct{})
|
||||||
|
topic := gubgub.NewAsyncTopic[string](gubgub.WithOnSubscribe(func() { close(latch) }))
|
||||||
|
defer topic.Close() // It's ok to close a topic multiple times
|
||||||
|
|
||||||
|
receiver := make(chan string, 3) // closed later
|
||||||
|
|
||||||
|
_ = topic.Subscribe(gubgub.Forever(func(msg string) {
|
||||||
|
receiver <- strings.ToUpper(msg)
|
||||||
|
}))
|
||||||
|
|
||||||
|
<-latch // Wait for the subscribe to register
|
||||||
|
|
||||||
|
_ = topic.Publish("aaa")
|
||||||
|
_ = topic.Publish("bbb")
|
||||||
|
_ = topic.Publish("ccc")
|
||||||
|
|
||||||
|
topic.Close() // Close topic in order to wait for all messages to be delivered
|
||||||
|
|
||||||
|
close(receiver) // Now it's safe to close the receiver channel since no more messages will be delivered
|
||||||
|
|
||||||
|
msgLst := make([]string, 0, 3)
|
||||||
|
for msg := range receiver {
|
||||||
|
msgLst = append(msgLst, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Strings(msgLst) // Because publish order is not guaranteed with the AsyncTopic (yet) we need to sort results for consistent output
|
||||||
|
|
||||||
|
for _, msg := range msgLst {
|
||||||
|
fmt.Println(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Output: AAA
|
||||||
|
// BBB
|
||||||
|
// CCC
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleSyncTopic() {
|
||||||
|
topic := gubgub.NewSyncTopic[string]()
|
||||||
|
defer topic.Close()
|
||||||
|
|
||||||
|
receiver := make(chan string)
|
||||||
|
defer close(receiver)
|
||||||
|
|
||||||
|
_ = topic.Subscribe(gubgub.Buffered(gubgub.Forever(func(msg string) {
|
||||||
|
receiver <- msg
|
||||||
|
})))
|
||||||
|
|
||||||
|
_ = topic.Publish("aaa")
|
||||||
|
_ = topic.Publish("bbb")
|
||||||
|
_ = topic.Publish("ccc")
|
||||||
|
|
||||||
|
topic.Close()
|
||||||
|
|
||||||
|
// Notice how despite the subscriber is blocked trying to push to an unbuffered channel it doesn't
|
||||||
|
// block publishing to a SyncTopic thanks to the Buffered subscriber. Messages are considered
|
||||||
|
// delivered even if they are in the buffer which is why Close returns.
|
||||||
|
|
||||||
|
for range 3 {
|
||||||
|
fmt.Println(<-receiver)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Output: aaa
|
||||||
|
// bbb
|
||||||
|
// ccc
|
||||||
|
}
|
||||||
42
feed.go
42
feed.go
@@ -1,42 +0,0 @@
|
|||||||
package gubgub
|
|
||||||
|
|
||||||
import (
|
|
||||||
"iter"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Feed allows the usage of for/range loop to consume future published messages.
|
|
||||||
// The supporting subscriber will eventually be discarded after you exit the for loop.
|
|
||||||
func Feed[T any](t Subscribable[T], buffered bool) (iter.Seq[T], error) {
|
|
||||||
feed := make(chan T) // closed by the subscriber
|
|
||||||
unsubscribe := make(chan struct{}) // closed by the iterator
|
|
||||||
|
|
||||||
subscriber := func(msg T) bool {
|
|
||||||
select {
|
|
||||||
case feed <- msg:
|
|
||||||
return true
|
|
||||||
case <-unsubscribe:
|
|
||||||
close(feed)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if buffered {
|
|
||||||
subscriber = Buffered(subscriber)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := t.Subscribe(subscriber)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterator
|
|
||||||
return func(yield func(T) bool) {
|
|
||||||
defer close(unsubscribe)
|
|
||||||
|
|
||||||
for msg := range feed {
|
|
||||||
if !yield(msg) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
64
feed_test.go
64
feed_test.go
@@ -1,64 +0,0 @@
|
|||||||
package gubgub
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestFeed_Topics(t *testing.T) {
|
|
||||||
const msgCount = 10
|
|
||||||
|
|
||||||
subscriberReady := make(chan struct{}, 1)
|
|
||||||
defer close(subscriberReady)
|
|
||||||
|
|
||||||
onSubscribe := WithOnSubscribe(func() {
|
|
||||||
subscriberReady <- struct{}{}
|
|
||||||
})
|
|
||||||
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
topic Topic[int]
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "sync topic",
|
|
||||||
topic: NewSyncTopic[int](onSubscribe),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "async topic",
|
|
||||||
topic: NewAsyncTopic[int](onSubscribe),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, tc := range testCases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
feed, err := Feed(tc.topic, false)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
feedback := make(chan int)
|
|
||||||
go func() {
|
|
||||||
for i := range feed {
|
|
||||||
feedback <- i
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
<-subscriberReady
|
|
||||||
for i := range msgCount {
|
|
||||||
require.NoError(t, tc.topic.Publish(i))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var counter int
|
|
||||||
timeout := testTimer(t, time.Second)
|
|
||||||
for counter < msgCount {
|
|
||||||
select {
|
|
||||||
case <-feedback:
|
|
||||||
counter++
|
|
||||||
case <-timeout.C:
|
|
||||||
t.Fatalf("expected %d feedback values by now but only got %d", msgCount, counter)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
2
go.mod
2
go.mod
@@ -1,6 +1,6 @@
|
|||||||
module github.com/nmoniz/gubgub
|
module github.com/nmoniz/gubgub
|
||||||
|
|
||||||
go 1.24
|
go 1.25
|
||||||
|
|
||||||
require github.com/stretchr/testify v1.9.0
|
require github.com/stretchr/testify v1.9.0
|
||||||
|
|
||||||
|
|||||||
57
options.go
57
options.go
@@ -1,11 +1,11 @@
|
|||||||
package gubgub
|
package gubgub
|
||||||
|
|
||||||
import (
|
import "sync"
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TopicOptions holds common options for topics.
|
// TopicOptions holds common options for topics.
|
||||||
type TopicOptions struct {
|
type TopicOptions struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
|
||||||
// onClose is called after the Topic is closed and all messages have been delivered. Even
|
// onClose is called after the Topic is closed and all messages have been delivered. Even
|
||||||
// though you might call Close multiple times, topics are effectively closed only once thus
|
// though you might call Close multiple times, topics are effectively closed only once thus
|
||||||
// this should be called only once.
|
// this should be called only once.
|
||||||
@@ -15,16 +15,63 @@ type TopicOptions struct {
|
|||||||
onSubscribe func()
|
onSubscribe func()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (to *TopicOptions) TriggerClose() {
|
||||||
|
to.mu.Lock()
|
||||||
|
defer to.mu.Unlock()
|
||||||
|
|
||||||
|
if to.onClose == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
to.onClose()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (to *TopicOptions) TriggerSubscribe() {
|
||||||
|
to.mu.Lock()
|
||||||
|
defer to.mu.Unlock()
|
||||||
|
|
||||||
|
if to.onSubscribe == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
to.onSubscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (to *TopicOptions) Apply(opts ...TopicOption) {
|
||||||
|
to.mu.Lock()
|
||||||
|
defer to.mu.Unlock()
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(to)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type TopicOption func(*TopicOptions)
|
type TopicOption func(*TopicOptions)
|
||||||
|
|
||||||
func WithOnClose(fn func()) TopicOption {
|
func WithOnClose(fn func()) TopicOption {
|
||||||
return func(opts *TopicOptions) {
|
return func(opts *TopicOptions) {
|
||||||
opts.onClose = sync.OnceFunc(fn)
|
if opts.onClose == nil {
|
||||||
|
opts.onClose = fn
|
||||||
|
} else {
|
||||||
|
oldFn := opts.onClose // preserve previous onClose handler
|
||||||
|
opts.onClose = func() {
|
||||||
|
fn()
|
||||||
|
oldFn()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithOnSubscribe(fn func()) TopicOption {
|
func WithOnSubscribe(fn func()) TopicOption {
|
||||||
return func(opts *TopicOptions) {
|
return func(opts *TopicOptions) {
|
||||||
opts.onSubscribe = fn
|
if opts.onSubscribe == nil {
|
||||||
|
opts.onSubscribe = fn
|
||||||
|
} else {
|
||||||
|
oldFn := opts.onSubscribe // preserve previous onSubscribe handler
|
||||||
|
opts.onSubscribe = func() {
|
||||||
|
fn()
|
||||||
|
oldFn()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,38 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestTriggerClose(t *testing.T) {
|
||||||
|
to := TopicOptions{}
|
||||||
|
|
||||||
|
var calls int
|
||||||
|
to.Apply(
|
||||||
|
WithOnClose(func() { calls++ }),
|
||||||
|
WithOnClose(func() { calls++ }),
|
||||||
|
WithOnClose(func() { calls++ }))
|
||||||
|
|
||||||
|
to.TriggerClose()
|
||||||
|
|
||||||
|
if calls != 3 {
|
||||||
|
t.Fatalf("wants 3 calls but got %d", calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTriggerSubscribe(t *testing.T) {
|
||||||
|
to := TopicOptions{}
|
||||||
|
|
||||||
|
var calls int
|
||||||
|
to.Apply(
|
||||||
|
WithOnSubscribe(func() { calls++ }),
|
||||||
|
WithOnSubscribe(func() { calls++ }),
|
||||||
|
WithOnSubscribe(func() { calls++ }))
|
||||||
|
|
||||||
|
to.TriggerSubscribe()
|
||||||
|
|
||||||
|
if calls != 3 {
|
||||||
|
t.Fatalf("wants 3 calls but got %d", calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestWithOnClose(t *testing.T) {
|
func TestWithOnClose(t *testing.T) {
|
||||||
type closable interface {
|
type closable interface {
|
||||||
Close()
|
Close()
|
||||||
|
|||||||
21
sync.go
21
sync.go
@@ -19,24 +19,17 @@ type SyncTopic[T any] struct {
|
|||||||
|
|
||||||
// NewSyncTopic creates a SyncTopic with the specified options.
|
// NewSyncTopic creates a SyncTopic with the specified options.
|
||||||
func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] {
|
func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] {
|
||||||
options := TopicOptions{
|
t := &SyncTopic[T]{}
|
||||||
onClose: func() {},
|
|
||||||
onSubscribe: func() {},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, opt := range opts {
|
t.SetOptions(opts...)
|
||||||
opt(&options)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &SyncTopic[T]{
|
return t
|
||||||
options: options,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close will prevent further publishing and subscribing.
|
// Close will prevent further publishing and subscribing.
|
||||||
func (t *SyncTopic[T]) Close() {
|
func (t *SyncTopic[T]) Close() {
|
||||||
t.closed.Store(true)
|
t.closed.Store(true)
|
||||||
t.options.onClose()
|
t.options.TriggerClose()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish broadcasts a message to all subscribers.
|
// Publish broadcasts a message to all subscribers.
|
||||||
@@ -63,7 +56,11 @@ func (t *SyncTopic[T]) Subscribe(fn Subscriber[T]) error {
|
|||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
|
|
||||||
t.subscribers = append(t.subscribers, fn)
|
t.subscribers = append(t.subscribers, fn)
|
||||||
t.options.onSubscribe()
|
t.options.TriggerSubscribe()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *SyncTopic[T]) SetOptions(opts ...TopicOption) {
|
||||||
|
t.options.Apply(opts...)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user