fix message delivery promise

This commit is contained in:
2024-09-05 10:09:30 +01:00
parent 6527e009e2
commit a887f9d800

View File

@@ -63,13 +63,22 @@ func (t *AsyncTopic[T]) run() {
var subscribers []Subscriber[T] var subscribers []Subscriber[T]
var drainedSubscribe, drainedPublish bool defer func() {
for !drainedSubscribe || !drainedPublish { // There are only one way to get here: the topic is now closed!
// Because both `subscribeCh` and `publishCh` channels are closed when the topic is closed
// this will always eventually return.
// This will deliver any potential queued message thus fulfilling the message delivery
// promise.
for msg := range t.publishCh {
subscribers = sequentialDelivery(msg, subscribers)
}
}()
for {
select { select {
case newCallback, more := <-t.subscribeCh: case newCallback, more := <-t.subscribeCh:
if !more { if !more {
drainedSubscribe = true return
break
} }
subscribers = append(subscribers, newCallback) subscribers = append(subscribers, newCallback)
@@ -77,8 +86,7 @@ func (t *AsyncTopic[T]) run() {
case msg, more := <-t.publishCh: case msg, more := <-t.publishCh:
if !more { if !more {
drainedPublish = true return
break
} }
subscribers = sequentialDelivery(msg, subscribers) subscribers = sequentialDelivery(msg, subscribers)