handle context cancelation

This commit is contained in:
2025-11-13 16:05:16 +00:00
parent 54fced39aa
commit f3d0f5d71a
7 changed files with 50 additions and 29 deletions

View File

@@ -1,6 +1,7 @@
package internal
import (
"context"
"errors"
"fmt"
"io"
@@ -10,40 +11,45 @@ import (
type RecordReader interface {
// ReadRecord should return Records until an error is found.
ReadRecord() (Record, error)
ReadRecord(context.Context) (Record, error)
}
type ReportWriter interface {
// ReportWriter writes report items
Write(ReportItem) error
Write(context.Context, ReportItem) error
}
func BuildReport(reader RecordReader, writer ReportWriter) error {
func BuildReport(ctx context.Context, reader RecordReader, writer ReportWriter) error {
buys := make(map[string]*RecordQueue)
for {
rec, err := reader.ReadRecord()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
select {
case <-ctx.Done():
return ctx.Err()
default:
rec, err := reader.ReadRecord(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
return err
}
buyQueue, ok := buys[rec.Symbol()]
if !ok {
buyQueue = new(RecordQueue)
buys[rec.Symbol()] = buyQueue
}
buyQueue, ok := buys[rec.Symbol()]
if !ok {
buyQueue = new(RecordQueue)
buys[rec.Symbol()] = buyQueue
}
err = processRecord(buyQueue, rec, writer)
if err != nil {
return fmt.Errorf("processing record: %w", err)
err = processRecord(ctx, buyQueue, rec, writer)
if err != nil {
return fmt.Errorf("processing record: %w", err)
}
}
}
}
func processRecord(q *RecordQueue, rec Record, writer ReportWriter) error {
func processRecord(ctx context.Context, q *RecordQueue, rec Record, writer ReportWriter) error {
switch rec.Side() {
case SideBuy:
q.Push(rec)
@@ -72,7 +78,7 @@ func processRecord(q *RecordQueue, rec Record, writer ReportWriter) error {
sellValue := new(big.Float).Mul(matchedQty, rec.Price())
buyValue := new(big.Float).Mul(matchedQty, buy.Price())
err := writer.Write(ReportItem{
err := writer.Write(ctx, ReportItem{
BuyValue: buyValue,
BuyTimestamp: buy.Timestamp(),
SellValue: sellValue,