Files
any2anexoj/internal/report.go
Natercio Moniz d371aca767
All checks were successful
Generate check / check-changes (pull_request) Successful in 3s
Quality / check-changes (pull_request) Successful in 3s
Generate check / verify-generate (pull_request) Has been skipped
Quality / run-tests (pull_request) Successful in 13s
apply selectors only on sells for performance
2026-05-16 11:58:35 +01:00

177 lines
4.0 KiB
Go

package internal
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"time"
"github.com/shopspring/decimal"
)
type Record interface {
Symbol() string
Nature() Nature
BrokerCountry() int64
AssetCountry() int64
Side() Side
Price() decimal.Decimal
Quantity() decimal.Decimal
Timestamp() time.Time
Fees() decimal.Decimal
Taxes() decimal.Decimal
}
type RecordReader interface {
// ReadRecord should return Records until an error is found.
ReadRecord(context.Context) (Record, error)
}
type ReportItem struct {
Symbol string
Nature Nature
BrokerCountry int64
AssetCountry int64
BuyValue decimal.Decimal
BuyTimestamp time.Time
SellValue decimal.Decimal
SellTimestamp time.Time
Fees decimal.Decimal
Taxes decimal.Decimal
}
func (ri ReportItem) RealisedPnL() decimal.Decimal {
return ri.SellValue.Sub(ri.BuyValue)
}
type ReportWriter interface {
// ReportWriter writes report items
Write(context.Context, ReportItem) error
}
// Selector returns true if a record should be selected for processing, false otherwise.
type Selector func(Record) bool
// BuildReport reads records from a RecordReader and, if the record passes the Selector, it is
// processed into the ReportWriter.
func BuildReport(ctx context.Context, reader RecordReader, writer ReportWriter, sel Selector) error {
buys := make(map[string]*FillerQueue)
var buysCount, sellsCount int64
var lastTimestamp time.Time
progTicker := time.NewTicker(10 * time.Second)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-progTicker.C:
slog.InfoContext(ctx, "Progress update",
slog.Int64("total_records", buysCount+sellsCount),
slog.Int64("sell_records", sellsCount),
slog.Int64("buy_records", buysCount),
slog.Time("last_record_timestamp", lastTimestamp),
)
default:
rec, err := reader.ReadRecord(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
if rec.Side().IsBuy() {
buysCount++
} else {
sellsCount++
}
lastTimestamp = rec.Timestamp()
buyQueue, ok := buys[rec.Symbol()]
if !ok {
buyQueue = new(FillerQueue)
buys[rec.Symbol()] = buyQueue
}
err = processRecord(ctx, buyQueue, rec, sel, writer)
if err != nil {
return fmt.Errorf("processing record: %w", err)
}
}
}
}
// processRecord either adds buys to the queue or consumes buys from the queue when processing a
// sell record.
// Selectors are only applied for sells for performance reasons. It's much cheaper to just accumulate
// buys and only actually inspect a record once a sell happens
func processRecord(ctx context.Context, q *FillerQueue, rec Record, sel Selector, writer ReportWriter) error {
slog.Debug("Report: processing record",
slog.String("symbol", rec.Symbol()),
slog.String("side", rec.Side().String()),
)
switch rec.Side() {
case SideBuy:
q.Push(NewFiller(rec))
case SideSell:
if !sel(rec) {
slog.Debug("Report: skipping record",
slog.String("symbol", rec.Symbol()),
slog.String("side", rec.Side().String()),
)
return nil
}
unmatchedQty := rec.Quantity()
for unmatchedQty.IsPositive() {
buy, ok := q.Peek()
if !ok {
return ErrInsufficientBoughtVolume
}
matchedQty, filled := buy.Fill(unmatchedQty)
if filled {
_, ok := q.Pop()
if !ok {
return fmt.Errorf("pop empty filler queue")
}
}
unmatchedQty = unmatchedQty.Sub(matchedQty)
buyValue := matchedQty.Mul(buy.Price())
sellValue := matchedQty.Mul(rec.Price())
err := writer.Write(ctx, ReportItem{
Symbol: rec.Symbol(),
BrokerCountry: rec.BrokerCountry(),
AssetCountry: rec.AssetCountry(),
BuyValue: buyValue,
BuyTimestamp: buy.Timestamp(),
SellValue: sellValue,
SellTimestamp: rec.Timestamp(),
Fees: buy.Fees().Add(rec.Fees()),
Taxes: buy.Taxes().Add(rec.Taxes()),
Nature: buy.Nature(),
})
if err != nil {
return fmt.Errorf("write report item: %w", err)
}
}
default:
return fmt.Errorf("unknown side: %v", rec.Side())
}
return nil
}