refactor some tests
This commit is contained in:
@@ -32,7 +32,7 @@ func consumer(msg MyMessage) {
|
|||||||
fmt.Printf("Hello %s", msg.Name)
|
fmt.Printf("Hello %s", msg.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func m2ain() {
|
func main() {
|
||||||
topic := gubgub.NewAsyncTopic[MyMessage]()
|
topic := gubgub.NewAsyncTopic[MyMessage]()
|
||||||
defer topic.Close() // Returns after all messages are delivered
|
defer topic.Close() // Returns after all messages are delivered
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,8 @@ func BenchmarkAsyncTopic_Publish(b *testing.B) {
|
|||||||
for _, tc := range publishCases {
|
for _, tc := range publishCases {
|
||||||
b.Run(tc.Name, func(b *testing.B) {
|
b.Run(tc.Name, func(b *testing.B) {
|
||||||
onSubscribe, subscribersReady := withNotifyOnNthSubscriber(b, int64(tc.Count))
|
onSubscribe, subscribersReady := withNotifyOnNthSubscriber(b, int64(tc.Count))
|
||||||
topic := newTestAsyncTopic[int](b, onSubscribe)
|
topic := NewAsyncTopic[int](onSubscribe)
|
||||||
|
b.Cleanup(topic.Close)
|
||||||
|
|
||||||
for range tc.Count {
|
for range tc.Count {
|
||||||
require.NoError(b, topic.Subscribe(tc.Subscriber))
|
require.NoError(b, topic.Subscribe(tc.Subscriber))
|
||||||
|
|||||||
@@ -13,7 +13,8 @@ func TestAsyncTopic_SinglePublisherSingleSubscriber(t *testing.T) {
|
|||||||
const msgCount = 10
|
const msgCount = 10
|
||||||
|
|
||||||
onSubscribe, subscriberReady := withNotifyOnNthSubscriber(t, 1)
|
onSubscribe, subscriberReady := withNotifyOnNthSubscriber(t, 1)
|
||||||
topic := newTestAsyncTopic[int](t, onSubscribe)
|
topic := NewAsyncTopic[int](onSubscribe)
|
||||||
|
t.Cleanup(topic.Close)
|
||||||
|
|
||||||
feedback := make(chan struct{}, msgCount)
|
feedback := make(chan struct{}, msgCount)
|
||||||
defer close(feedback)
|
defer close(feedback)
|
||||||
@@ -52,7 +53,8 @@ func TestAsyncTopic_MultiPublishersMultiSubscribers(t *testing.T) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
onSubscribe, subscribersReady := withNotifyOnNthSubscriber(t, subCount)
|
onSubscribe, subscribersReady := withNotifyOnNthSubscriber(t, subCount)
|
||||||
topic := newTestAsyncTopic[int](t, onSubscribe)
|
topic := NewAsyncTopic[int](onSubscribe)
|
||||||
|
t.Cleanup(topic.Close)
|
||||||
|
|
||||||
expFeedbackCount := msgCount * subCount
|
expFeedbackCount := msgCount * subCount
|
||||||
feedback := make(chan int, expFeedbackCount)
|
feedback := make(chan int, expFeedbackCount)
|
||||||
@@ -172,11 +174,12 @@ func TestAsyncTopic_ClosedTopicError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAsyncTopic_AllPublishedBeforeClosedAreDeliveredAfterClosed(t *testing.T) {
|
func TestAsyncTopic_AllPublishedBeforeClosedAreDelivered(t *testing.T) {
|
||||||
const msgCount = 10
|
const msgCount = 10
|
||||||
|
|
||||||
onSubscribe, subscriberReady := withNotifyOnNthSubscriber(t, 1)
|
onSubscribe, subscriberReady := withNotifyOnNthSubscriber(t, 1)
|
||||||
topic := newTestAsyncTopic[int](t, onSubscribe)
|
topic := NewAsyncTopic[int](onSubscribe)
|
||||||
|
t.Cleanup(topic.Close)
|
||||||
|
|
||||||
feedback := make(chan int) // unbuffered will cause choke point for publishers
|
feedback := make(chan int) // unbuffered will cause choke point for publishers
|
||||||
defer close(feedback)
|
defer close(feedback)
|
||||||
@@ -220,13 +223,6 @@ func testTimer(t testing.TB, d time.Duration) *time.Timer {
|
|||||||
return timer
|
return timer
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestAsyncTopic[T any](t testing.TB, opts ...TopicOption) *AsyncTopic[T] {
|
|
||||||
t.Helper()
|
|
||||||
topic := NewAsyncTopic[T](opts...)
|
|
||||||
t.Cleanup(topic.Close)
|
|
||||||
return topic
|
|
||||||
}
|
|
||||||
|
|
||||||
func withNotifyOnNthSubscriber(t testing.TB, n int64) (TopicOption, <-chan struct{}) {
|
func withNotifyOnNthSubscriber(t testing.TB, n int64) (TopicOption, <-chan struct{}) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user