deleted the feed due to severe issues found
i couldn't find a good enough solution so decided to remove entirely
This commit is contained in:
@@ -1,77 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/nmoniz/gubgub"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
topic := gubgub.NewAsyncTopic[string]()
|
|
||||||
defer topic.Close()
|
|
||||||
|
|
||||||
err := topic.Subscribe(gubgub.Forever(UpperCaser))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = topic.Subscribe(Countdown(3))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = topic.Subscribe(gubgub.Buffered(gubgub.Forever(Slow)))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
feed, err := gubgub.Feed(topic, false)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for s := range feed {
|
|
||||||
log.Printf("ForRange: %s", s)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
fmt.Printf("Use 'Ctrl+C' to exit! Type a message followed by 'Enter' to publish it:\n")
|
|
||||||
scanner := bufio.NewScanner(os.Stdin)
|
|
||||||
for {
|
|
||||||
scanner.Scan()
|
|
||||||
err := scanner.Err()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
topic.Publish(scanner.Text())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func UpperCaser(input string) {
|
|
||||||
log.Printf("UpperCaser: %s", strings.ToUpper(input))
|
|
||||||
}
|
|
||||||
|
|
||||||
func Countdown(count int) gubgub.Subscriber[string] {
|
|
||||||
return func(_ string) bool {
|
|
||||||
count--
|
|
||||||
if count > 0 {
|
|
||||||
log.Printf("Countdown: %d...", count)
|
|
||||||
return true
|
|
||||||
} else {
|
|
||||||
log.Print("Countdown: I'm out!")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Slow(input string) {
|
|
||||||
time.Sleep(time.Second * 3)
|
|
||||||
log.Printf("Slow: %s", input)
|
|
||||||
}
|
|
||||||
42
feed.go
42
feed.go
@@ -1,42 +0,0 @@
|
|||||||
package gubgub
|
|
||||||
|
|
||||||
import (
|
|
||||||
"iter"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Feed allows the usage of for/range loop to consume future published messages.
|
|
||||||
// The supporting subscriber will eventually be discarded after you exit the for loop.
|
|
||||||
func Feed[T any](t Subscribable[T], buffered bool) (iter.Seq[T], error) {
|
|
||||||
feed := make(chan T) // closed by the subscriber
|
|
||||||
unsubscribe := make(chan struct{}) // closed by the iterator
|
|
||||||
|
|
||||||
subscriber := func(msg T) bool {
|
|
||||||
select {
|
|
||||||
case feed <- msg:
|
|
||||||
return true
|
|
||||||
case <-unsubscribe:
|
|
||||||
close(feed)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if buffered {
|
|
||||||
subscriber = Buffered(subscriber)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := t.Subscribe(subscriber)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterator
|
|
||||||
return func(yield func(T) bool) {
|
|
||||||
defer close(unsubscribe)
|
|
||||||
|
|
||||||
for msg := range feed {
|
|
||||||
if !yield(msg) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
64
feed_test.go
64
feed_test.go
@@ -1,64 +0,0 @@
|
|||||||
package gubgub
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestFeed_Topics(t *testing.T) {
|
|
||||||
const msgCount = 10
|
|
||||||
|
|
||||||
subscriberReady := make(chan struct{}, 1)
|
|
||||||
defer close(subscriberReady)
|
|
||||||
|
|
||||||
onSubscribe := WithOnSubscribe(func() {
|
|
||||||
subscriberReady <- struct{}{}
|
|
||||||
})
|
|
||||||
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
topic Topic[int]
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "sync topic",
|
|
||||||
topic: NewSyncTopic[int](onSubscribe),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "async topic",
|
|
||||||
topic: NewAsyncTopic[int](onSubscribe),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, tc := range testCases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
feed, err := Feed(tc.topic, false)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
feedback := make(chan int)
|
|
||||||
go func() {
|
|
||||||
for i := range feed {
|
|
||||||
feedback <- i
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
<-subscriberReady
|
|
||||||
for i := range msgCount {
|
|
||||||
require.NoError(t, tc.topic.Publish(i))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var counter int
|
|
||||||
timeout := testTimer(t, time.Second)
|
|
||||||
for counter < msgCount {
|
|
||||||
select {
|
|
||||||
case <-feedback:
|
|
||||||
counter++
|
|
||||||
case <-timeout.C:
|
|
||||||
t.Fatalf("expected %d feedback values by now but only got %d", msgCount, counter)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user