further fixes to the async Close method
This commit is contained in:
13
async.go
13
async.go
@@ -47,17 +47,10 @@ func NewAsyncTopic[T any](opts ...TopicOption) *AsyncTopic[T] {
|
|||||||
// Close terminates background go routines and prevents further publishing and subscribing. All
|
// Close terminates background go routines and prevents further publishing and subscribing. All
|
||||||
// published messages are garanteed to be delivered once Close returns. This is idempotent.
|
// published messages are garanteed to be delivered once Close returns. This is idempotent.
|
||||||
func (t *AsyncTopic[T]) Close() {
|
func (t *AsyncTopic[T]) Close() {
|
||||||
t.mu.RLock()
|
|
||||||
closing := t.closing
|
|
||||||
t.mu.RUnlock()
|
|
||||||
if closing {
|
|
||||||
// It's either already closed or it's closing.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
if closing {
|
if t.closing {
|
||||||
// It is possible that 2 go routines attempted to aquire the lock to close this topic.
|
// Multiple go routines attempted to close this topic. Both should wait for the topic to be
|
||||||
|
// closed before returning.
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
<-t.closed
|
<-t.closed
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user