package internal import ( "context" "errors" "fmt" "io" "log/slog" "time" "github.com/shopspring/decimal" ) type Record interface { Symbol() string Nature() Nature BrokerCountry() int64 AssetCountry() int64 Kind() Kind Price() decimal.Decimal Quantity() decimal.Decimal Timestamp() time.Time Fees() decimal.Decimal Taxes() decimal.Decimal } type RecordReader interface { // ReadRecord should return Records until an error is found. ReadRecord(context.Context) (Record, error) } type ReportItem struct { Symbol string Nature Nature BrokerCountry int64 AssetCountry int64 BuyValue decimal.Decimal BuyTimestamp time.Time SellValue decimal.Decimal SellTimestamp time.Time Fees decimal.Decimal Taxes decimal.Decimal } func (ri ReportItem) RealisedPnL() decimal.Decimal { return ri.SellValue.Sub(ri.BuyValue) } type ReportWriter interface { // ReportWriter writes report items Write(context.Context, ReportItem) error } // Selector returns true if a record should be selected for processing, false otherwise. type Selector func(Record) bool // BuildReport reads records from a RecordReader and, if the record passes the Selector, it is // processed into the ReportWriter. func BuildReport(ctx context.Context, reader RecordReader, writer ReportWriter, sel Selector) error { buys := make(map[string]*FillerQueue) var buysCount, sellsCount int64 var lastTimestamp time.Time progTicker := time.NewTicker(10 * time.Second) for { select { case <-ctx.Done(): return ctx.Err() case <-progTicker.C: slog.InfoContext(ctx, "Progress update", slog.Int64("total_records", buysCount+sellsCount), slog.Int64("sell_records", sellsCount), slog.Int64("buy_records", buysCount), slog.Time("last_record_timestamp", lastTimestamp), ) default: rec, err := reader.ReadRecord(ctx) if err != nil { if errors.Is(err, io.EOF) { return nil } return err } if rec.Kind().Is(KindBuy) { buysCount++ } else if rec.Kind().Is(KindSell) { sellsCount++ } lastTimestamp = rec.Timestamp() buyQueue, ok := buys[rec.Symbol()] if !ok { buyQueue = new(FillerQueue) buys[rec.Symbol()] = buyQueue } err = processRecord(ctx, buyQueue, rec, sel, writer) if err != nil { return fmt.Errorf("processing record: %w", err) } } } } // processRecord either adds buys to the queue or consumes buys from the queue when processing a // sell record. // Selectors are only applied on sells for performance reasons. It's much cheaper to just accumulate // buys and only actually inspect a record once a sell happens due to potential network requests to func processRecord(ctx context.Context, q *FillerQueue, rec Record, sel Selector, writer ReportWriter) error { slog.Debug("Report: processing record", slog.String("symbol", rec.Symbol()), slog.String("side", rec.Kind().String()), ) switch rec.Kind() { case KindBuy: q.Push(NewFiller(rec)) case KindSell: if !sel(rec) { slog.Debug("Report: skipping record", slog.String("symbol", rec.Symbol()), slog.String("side", rec.Kind().String()), ) return nil } unmatchedQty := rec.Quantity() for unmatchedQty.IsPositive() { buy, ok := q.Peek() if !ok { return ErrInsufficientBoughtVolume } matchedQty, filled := buy.Fill(unmatchedQty) if filled { _, ok := q.Pop() if !ok { return fmt.Errorf("pop empty filler queue") } } unmatchedQty = unmatchedQty.Sub(matchedQty) buyValue := matchedQty.Mul(buy.Price()) sellValue := matchedQty.Mul(rec.Price()) err := writer.Write(ctx, ReportItem{ Symbol: rec.Symbol(), BrokerCountry: rec.BrokerCountry(), AssetCountry: rec.AssetCountry(), BuyValue: buyValue, BuyTimestamp: buy.Timestamp(), SellValue: sellValue, SellTimestamp: rec.Timestamp(), Fees: buy.Fees().Add(rec.Fees()), Taxes: buy.Taxes().Add(rec.Taxes()), Nature: buy.Nature(), }) if err != nil { return fmt.Errorf("write report item: %w", err) } } default: return fmt.Errorf("unknown side: %v", rec.Kind()) } return nil }