diff --git a/example/main.go b/example/main.go deleted file mode 100644 index a5fffb3..0000000 --- a/example/main.go +++ /dev/null @@ -1,77 +0,0 @@ -package main - -import ( - "bufio" - "fmt" - "log" - "os" - "strings" - "time" - - "github.com/nmoniz/gubgub" -) - -func main() { - topic := gubgub.NewAsyncTopic[string]() - defer topic.Close() - - err := topic.Subscribe(gubgub.Forever(UpperCaser)) - if err != nil { - log.Fatal(err) - } - - err = topic.Subscribe(Countdown(3)) - if err != nil { - log.Fatal(err) - } - - err = topic.Subscribe(gubgub.Buffered(gubgub.Forever(Slow))) - if err != nil { - log.Fatal(err) - } - - go func() { - feed, err := gubgub.Feed(topic, false) - if err != nil { - log.Fatal(err) - } - - for s := range feed { - log.Printf("ForRange: %s", s) - } - }() - - fmt.Printf("Use 'Ctrl+C' to exit! Type a message followed by 'Enter' to publish it:\n") - scanner := bufio.NewScanner(os.Stdin) - for { - scanner.Scan() - err := scanner.Err() - if err != nil { - log.Fatal(err) - } - - topic.Publish(scanner.Text()) - } -} - -func UpperCaser(input string) { - log.Printf("UpperCaser: %s", strings.ToUpper(input)) -} - -func Countdown(count int) gubgub.Subscriber[string] { - return func(_ string) bool { - count-- - if count > 0 { - log.Printf("Countdown: %d...", count) - return true - } else { - log.Print("Countdown: I'm out!") - return false - } - } -} - -func Slow(input string) { - time.Sleep(time.Second * 3) - log.Printf("Slow: %s", input) -} diff --git a/feed.go b/feed.go deleted file mode 100644 index ec66c81..0000000 --- a/feed.go +++ /dev/null @@ -1,42 +0,0 @@ -package gubgub - -import ( - "iter" -) - -// Feed allows the usage of for/range loop to consume future published messages. -// The supporting subscriber will eventually be discarded after you exit the for loop. -func Feed[T any](t Subscribable[T], buffered bool) (iter.Seq[T], error) { - feed := make(chan T) // closed by the subscriber - unsubscribe := make(chan struct{}) // closed by the iterator - - subscriber := func(msg T) bool { - select { - case feed <- msg: - return true - case <-unsubscribe: - close(feed) - return false - } - } - - if buffered { - subscriber = Buffered(subscriber) - } - - err := t.Subscribe(subscriber) - if err != nil { - return nil, err - } - - // Iterator - return func(yield func(T) bool) { - defer close(unsubscribe) - - for msg := range feed { - if !yield(msg) { - return - } - } - }, nil -} diff --git a/feed_test.go b/feed_test.go deleted file mode 100644 index ffa44f6..0000000 --- a/feed_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package gubgub - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestFeed_Topics(t *testing.T) { - const msgCount = 10 - - subscriberReady := make(chan struct{}, 1) - defer close(subscriberReady) - - onSubscribe := WithOnSubscribe(func() { - subscriberReady <- struct{}{} - }) - - testCases := []struct { - name string - topic Topic[int] - }{ - { - name: "sync topic", - topic: NewSyncTopic[int](onSubscribe), - }, - { - name: "async topic", - topic: NewAsyncTopic[int](onSubscribe), - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - feed, err := Feed(tc.topic, false) - require.NoError(t, err) - - feedback := make(chan int) - go func() { - for i := range feed { - feedback <- i - } - }() - - go func() { - <-subscriberReady - for i := range msgCount { - require.NoError(t, tc.topic.Publish(i)) - } - }() - - var counter int - timeout := testTimer(t, time.Second) - for counter < msgCount { - select { - case <-feedback: - counter++ - case <-timeout.C: - t.Fatalf("expected %d feedback values by now but only got %d", msgCount, counter) - } - } - }) - } -}