allow for/range with Feed
This commit is contained in:
27
async.go
27
async.go
@@ -3,6 +3,7 @@ package gubgub
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"iter"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -128,6 +129,32 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Feed allows the usage of for/range to consume future published messages. The supporting subscriber will eventually be discarded after you exit the for loop.
|
||||||
|
func (t *AsyncTopic[T]) Feed() iter.Seq[T] {
|
||||||
|
feed := make(chan T, 1)
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
t.Subscribe(func(msg T) bool {
|
||||||
|
select {
|
||||||
|
case feed <- msg:
|
||||||
|
return true
|
||||||
|
case <-done:
|
||||||
|
close(feed)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
return func(yield func(T) bool) {
|
||||||
|
defer close(done)
|
||||||
|
|
||||||
|
for msg := range feed {
|
||||||
|
if !yield(msg) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type AsyncTopicOptions struct {
|
type AsyncTopicOptions struct {
|
||||||
onClose func()
|
onClose func()
|
||||||
onSubscribe func(count int)
|
onSubscribe func(count int)
|
||||||
|
|||||||
Reference in New Issue
Block a user