use middleman to handle feeding messages
This commit is contained in:
61
async.go
61
async.go
@@ -129,26 +129,67 @@ func (t *AsyncTopic[T]) Subscribe(fn Subscriber[T]) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Feed allows the usage of for/range to consume future published messages. The supporting subscriber will eventually be discarded after you exit the for loop.
|
// Feed allows the usage of for/range to consume future published messages.
|
||||||
|
// The supporting subscriber will eventually be discarded after you exit the for loop.
|
||||||
func (t *AsyncTopic[T]) Feed() iter.Seq[T] {
|
func (t *AsyncTopic[T]) Feed() iter.Seq[T] {
|
||||||
feed := make(chan T, 1)
|
feed := make(chan T, 1) // closed by the middleman go routine
|
||||||
done := make(chan struct{})
|
messages := make(chan T, 1) // closed by the subscriber
|
||||||
|
yieldReady := make(chan struct{}) // closed by the iterator
|
||||||
|
unsubscribe := make(chan struct{}) // closed by the iterator
|
||||||
|
|
||||||
t.Subscribe(func(msg T) bool {
|
t.Subscribe(func(msg T) bool {
|
||||||
select {
|
select {
|
||||||
case feed <- msg:
|
case messages <- msg:
|
||||||
return true
|
return true
|
||||||
case <-done:
|
case <-unsubscribe:
|
||||||
close(feed)
|
close(messages)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
return func(yield func(T) bool) {
|
go func() {
|
||||||
defer close(done)
|
defer close(feed)
|
||||||
|
|
||||||
for msg := range feed {
|
q := make([]T, 0, 1)
|
||||||
if !yield(msg) {
|
waiting := false
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case m, more := <-messages:
|
||||||
|
if !more {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if waiting {
|
||||||
|
waiting = false
|
||||||
|
feed <- m
|
||||||
|
} else {
|
||||||
|
q = append(q, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
case _, more := <-yieldReady:
|
||||||
|
if !more {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(q) > 0 {
|
||||||
|
waiting = false
|
||||||
|
feed <- q[0]
|
||||||
|
q = q[1:]
|
||||||
|
} else {
|
||||||
|
waiting = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return func(yield func(T) bool) {
|
||||||
|
defer close(unsubscribe)
|
||||||
|
defer close(yieldReady)
|
||||||
|
|
||||||
|
for {
|
||||||
|
yieldReady <- struct{}{}
|
||||||
|
if !yield(<-feed) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user