ability to stack onClose and onSubscribe options
This commit is contained in:
20
async.go
20
async.go
@@ -23,22 +23,14 @@ type AsyncTopic[T any] struct {
|
|||||||
|
|
||||||
// NewAsyncTopic creates an AsyncTopic.
|
// NewAsyncTopic creates an AsyncTopic.
|
||||||
func NewAsyncTopic[T any](opts ...TopicOption) *AsyncTopic[T] {
|
func NewAsyncTopic[T any](opts ...TopicOption) *AsyncTopic[T] {
|
||||||
options := TopicOptions{
|
|
||||||
onClose: func() {}, // Called after the Topic is closed and all messages have been delivered.
|
|
||||||
onSubscribe: func() {}, // Called everytime a new subscriber is added
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(&options)
|
|
||||||
}
|
|
||||||
|
|
||||||
t := AsyncTopic[T]{
|
t := AsyncTopic[T]{
|
||||||
options: options,
|
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
publishCh: make(chan T, 1),
|
publishCh: make(chan T, 1),
|
||||||
subscribeCh: make(chan Subscriber[T], 1),
|
subscribeCh: make(chan Subscriber[T], 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.SetOptions(opts...)
|
||||||
|
|
||||||
go t.run()
|
go t.run()
|
||||||
|
|
||||||
return &t
|
return &t
|
||||||
@@ -68,7 +60,7 @@ func (t *AsyncTopic[T]) Close() {
|
|||||||
|
|
||||||
func (t *AsyncTopic[T]) run() {
|
func (t *AsyncTopic[T]) run() {
|
||||||
defer close(t.closed)
|
defer close(t.closed)
|
||||||
defer t.options.onClose()
|
defer t.options.TriggerClose()
|
||||||
|
|
||||||
var subscribers []Subscriber[T]
|
var subscribers []Subscriber[T]
|
||||||
|
|
||||||
@@ -91,7 +83,7 @@ func (t *AsyncTopic[T]) run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
subscribers = append(subscribers, newCallback)
|
subscribers = append(subscribers, newCallback)
|
||||||
t.options.onSubscribe()
|
t.options.TriggerSubscribe()
|
||||||
|
|
||||||
case msg, more := <-t.publishCh:
|
case msg, more := <-t.publishCh:
|
||||||
if !more {
|
if !more {
|
||||||
@@ -138,3 +130,7 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *AsyncTopic[T]) SetOptions(opts ...TopicOption) {
|
||||||
|
t.options.Apply(opts...)
|
||||||
|
}
|
||||||
|
|||||||
10
common.go
10
common.go
@@ -7,6 +7,8 @@ type Subscriber[T any] func(T) bool
|
|||||||
type Topic[T any] interface {
|
type Topic[T any] interface {
|
||||||
Publishable[T]
|
Publishable[T]
|
||||||
Subscribable[T]
|
Subscribable[T]
|
||||||
|
OptionsSetter
|
||||||
|
Closer
|
||||||
}
|
}
|
||||||
|
|
||||||
type Publishable[T any] interface {
|
type Publishable[T any] interface {
|
||||||
@@ -16,3 +18,11 @@ type Publishable[T any] interface {
|
|||||||
type Subscribable[T any] interface {
|
type Subscribable[T any] interface {
|
||||||
Subscribe(Subscriber[T]) error
|
Subscribe(Subscriber[T]) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type OptionsSetter interface {
|
||||||
|
SetOptions(...TopicOption)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Closer interface {
|
||||||
|
Close()
|
||||||
|
}
|
||||||
|
|||||||
57
options.go
57
options.go
@@ -1,11 +1,11 @@
|
|||||||
package gubgub
|
package gubgub
|
||||||
|
|
||||||
import (
|
import "sync"
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TopicOptions holds common options for topics.
|
// TopicOptions holds common options for topics.
|
||||||
type TopicOptions struct {
|
type TopicOptions struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
|
||||||
// onClose is called after the Topic is closed and all messages have been delivered. Even
|
// onClose is called after the Topic is closed and all messages have been delivered. Even
|
||||||
// though you might call Close multiple times, topics are effectively closed only once thus
|
// though you might call Close multiple times, topics are effectively closed only once thus
|
||||||
// this should be called only once.
|
// this should be called only once.
|
||||||
@@ -15,16 +15,63 @@ type TopicOptions struct {
|
|||||||
onSubscribe func()
|
onSubscribe func()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (to *TopicOptions) TriggerClose() {
|
||||||
|
to.mu.Lock()
|
||||||
|
defer to.mu.Unlock()
|
||||||
|
|
||||||
|
if to.onClose == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
to.onClose()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (to *TopicOptions) TriggerSubscribe() {
|
||||||
|
to.mu.Lock()
|
||||||
|
defer to.mu.Unlock()
|
||||||
|
|
||||||
|
if to.onSubscribe == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
to.onSubscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (to *TopicOptions) Apply(opts ...TopicOption) {
|
||||||
|
to.mu.Lock()
|
||||||
|
defer to.mu.Unlock()
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(to)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type TopicOption func(*TopicOptions)
|
type TopicOption func(*TopicOptions)
|
||||||
|
|
||||||
func WithOnClose(fn func()) TopicOption {
|
func WithOnClose(fn func()) TopicOption {
|
||||||
return func(opts *TopicOptions) {
|
return func(opts *TopicOptions) {
|
||||||
opts.onClose = sync.OnceFunc(fn)
|
if opts.onClose == nil {
|
||||||
|
opts.onClose = fn
|
||||||
|
} else {
|
||||||
|
oldFn := opts.onClose // preserve previous onClose handler
|
||||||
|
opts.onClose = func() {
|
||||||
|
fn()
|
||||||
|
oldFn()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithOnSubscribe(fn func()) TopicOption {
|
func WithOnSubscribe(fn func()) TopicOption {
|
||||||
return func(opts *TopicOptions) {
|
return func(opts *TopicOptions) {
|
||||||
opts.onSubscribe = fn
|
if opts.onSubscribe == nil {
|
||||||
|
opts.onSubscribe = fn
|
||||||
|
} else {
|
||||||
|
oldFn := opts.onSubscribe // preserve previous onSubscribe handler
|
||||||
|
opts.onSubscribe = func() {
|
||||||
|
fn()
|
||||||
|
oldFn()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,38 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestTriggerClose(t *testing.T) {
|
||||||
|
to := TopicOptions{}
|
||||||
|
|
||||||
|
var calls int
|
||||||
|
to.Apply(
|
||||||
|
WithOnClose(func() { calls++ }),
|
||||||
|
WithOnClose(func() { calls++ }),
|
||||||
|
WithOnClose(func() { calls++ }))
|
||||||
|
|
||||||
|
to.TriggerClose()
|
||||||
|
|
||||||
|
if calls != 3 {
|
||||||
|
t.Fatalf("wants 3 calls but got %d", calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTriggerSubscribe(t *testing.T) {
|
||||||
|
to := TopicOptions{}
|
||||||
|
|
||||||
|
var calls int
|
||||||
|
to.Apply(
|
||||||
|
WithOnSubscribe(func() { calls++ }),
|
||||||
|
WithOnSubscribe(func() { calls++ }),
|
||||||
|
WithOnSubscribe(func() { calls++ }))
|
||||||
|
|
||||||
|
to.TriggerSubscribe()
|
||||||
|
|
||||||
|
if calls != 3 {
|
||||||
|
t.Fatalf("wants 3 calls but got %d", calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestWithOnClose(t *testing.T) {
|
func TestWithOnClose(t *testing.T) {
|
||||||
type closable interface {
|
type closable interface {
|
||||||
Close()
|
Close()
|
||||||
|
|||||||
21
sync.go
21
sync.go
@@ -19,24 +19,17 @@ type SyncTopic[T any] struct {
|
|||||||
|
|
||||||
// NewSyncTopic creates a SyncTopic with the specified options.
|
// NewSyncTopic creates a SyncTopic with the specified options.
|
||||||
func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] {
|
func NewSyncTopic[T any](opts ...TopicOption) *SyncTopic[T] {
|
||||||
options := TopicOptions{
|
t := &SyncTopic[T]{}
|
||||||
onClose: func() {},
|
|
||||||
onSubscribe: func() {},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, opt := range opts {
|
t.SetOptions(opts...)
|
||||||
opt(&options)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &SyncTopic[T]{
|
return t
|
||||||
options: options,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close will prevent further publishing and subscribing.
|
// Close will prevent further publishing and subscribing.
|
||||||
func (t *SyncTopic[T]) Close() {
|
func (t *SyncTopic[T]) Close() {
|
||||||
t.closed.Store(true)
|
t.closed.Store(true)
|
||||||
t.options.onClose()
|
t.options.TriggerClose()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish broadcasts a message to all subscribers.
|
// Publish broadcasts a message to all subscribers.
|
||||||
@@ -63,7 +56,11 @@ func (t *SyncTopic[T]) Subscribe(fn Subscriber[T]) error {
|
|||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
|
|
||||||
t.subscribers = append(t.subscribers, fn)
|
t.subscribers = append(t.subscribers, fn)
|
||||||
t.options.onSubscribe()
|
t.options.TriggerSubscribe()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *SyncTopic[T]) SetOptions(opts ...TopicOption) {
|
||||||
|
t.options.Apply(opts...)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user