From 9283306006029014bd4ed927dacd0ef5521bb5d9 Mon Sep 17 00:00:00 2001 From: Natercio Moniz Date: Mon, 9 Sep 2024 16:38:58 +0100 Subject: [PATCH] fixed error handling and docs --- README.md | 23 +++++++++++++++++------ example/main.go | 23 +++++++++++++++++++---- feed.go | 15 ++++++++++----- feed_test.go | 6 ++++-- wrappers.go | 2 ++ 5 files changed, 52 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index bbc2fc5..874111f 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ go get -u gitlab.com/naterciom/gubgub ## Example -Ignoring errors for code brevity! +We'll be ignoring errors for code brevity! ```Go package main @@ -32,22 +32,33 @@ func consumer(msg MyMessage) { fmt.Printf("Hello %s", msg.Name) } -func main() { +func m2ain() { topic := gubgub.NewAsyncTopic[MyMessage]() - defer topic.Close() // Returns after all messages are delivered + defer topic.Close() // Returns after all messages are delivered - _ := topic.Subscribe(gubgub.Forever(consumer)) + _ = topic.Subscribe(gubgub.Forever(consumer)) // The AsyncTopic doesn't wait for the subscriber to be registered so, for the purposes of this // example, we sleep on it. time.Sleep(time.Millisecond) - _ := topic.Publish(MyMessage{Name: "John Smith"}) // Returns immediately + _ = topic.Publish(MyMessage{Name: "John Smith"}) // Returns immediately } ``` + ## Topics -Topics are what this is all about. You publish to a topic and you subscribe to a topic. That is it. +Topics are what this is all about. +You publish to a topic and you subscribe to a topic. +That is it. + +A `Subscriber` is just a callback func. +A message is considered delivered when all subscribers have been called for that message and returned. + +If you `Publish` a message successfully (did not get an error) then you can be sure the message will be deliverd before any call to `Close` returns. + +Topics are meant to live as long as the application but you should call the `Close` method upon shutdown to fulfill the publishing promise. +Use the `WithOnClose` option when creating the topic to perform any extra clean up you might need to do if the topic is closed. GubGub offers 2 kinds of topics: diff --git a/example/main.go b/example/main.go index ad767ad..d8d2fdb 100644 --- a/example/main.go +++ b/example/main.go @@ -15,14 +15,28 @@ func main() { topic := gubgub.NewAsyncTopic[string]() defer topic.Close() - topic.Subscribe(gubgub.Forever(UpperCaser)) + err := topic.Subscribe(gubgub.Forever(UpperCaser)) + if err != nil { + log.Fatal(err) + } - topic.Subscribe(Countdown(3)) + err = topic.Subscribe(Countdown(3)) + if err != nil { + log.Fatal(err) + } - topic.Subscribe(gubgub.Buffered(gubgub.Forever(Slow))) + err = topic.Subscribe(gubgub.Buffered(gubgub.Forever(Slow))) + if err != nil { + log.Fatal(err) + } go func() { - for s := range gubgub.Feed(topic, false) { + feed, err := gubgub.Feed(topic, false) + if err != nil { + log.Fatal(err) + } + + for s := range feed { log.Printf("ForRange: %s", s) } }() @@ -35,6 +49,7 @@ func main() { if err != nil { log.Fatal(err) } + topic.Publish(scanner.Text()) } } diff --git a/feed.go b/feed.go index 4165038..ec66c81 100644 --- a/feed.go +++ b/feed.go @@ -1,10 +1,12 @@ package gubgub -import "iter" +import ( + "iter" +) -// Feed allows the usage of for/range to consume future published messages. +// Feed allows the usage of for/range loop to consume future published messages. // The supporting subscriber will eventually be discarded after you exit the for loop. -func Feed[T any](t Subscribable[T], buffered bool) iter.Seq[T] { +func Feed[T any](t Subscribable[T], buffered bool) (iter.Seq[T], error) { feed := make(chan T) // closed by the subscriber unsubscribe := make(chan struct{}) // closed by the iterator @@ -22,7 +24,10 @@ func Feed[T any](t Subscribable[T], buffered bool) iter.Seq[T] { subscriber = Buffered(subscriber) } - t.Subscribe(subscriber) + err := t.Subscribe(subscriber) + if err != nil { + return nil, err + } // Iterator return func(yield func(T) bool) { @@ -33,5 +38,5 @@ func Feed[T any](t Subscribable[T], buffered bool) iter.Seq[T] { return } } - } + }, nil } diff --git a/feed_test.go b/feed_test.go index 6494a64..641b67a 100644 --- a/feed_test.go +++ b/feed_test.go @@ -32,9 +32,12 @@ func TestFeed_Topics(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + feed, err := Feed(tc.topic, false) + require.NoError(t, err) + feedback := make(chan int) go func() { - for i := range Feed(tc.topic, false) { + for i := range feed { feedback <- i } }() @@ -58,5 +61,4 @@ func TestFeed_Topics(t *testing.T) { } }) } - } diff --git a/wrappers.go b/wrappers.go index df71a4a..dd39ff7 100644 --- a/wrappers.go +++ b/wrappers.go @@ -29,6 +29,8 @@ func NoOp[T any]() Subscriber[T] { // available memory. // This is useful if message publishing is surge prone and message processing is slow or // unpredictable (for example: subscriber makes network request). +// IMPORTANT: messages are considered delivered even it they are still in the buffer which means +// that buffered subscribers are NOT COVERED by the publishing promise. // Message average processing rate must still be higher than the average message publishing rate // otherwise it will eventually lead to memory issues. You will need to find a better strategy to // deal with such scenario.