180 lines
4.2 KiB
Go
180 lines
4.2 KiB
Go
package internal
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/shopspring/decimal"
|
|
)
|
|
|
|
type Record interface {
|
|
Symbol() string
|
|
Nature() Nature
|
|
BrokerCountry() int64
|
|
AssetCountry() int64
|
|
Kind() Kind
|
|
Price() decimal.Decimal
|
|
Quantity() decimal.Decimal
|
|
Timestamp() time.Time
|
|
Fees() decimal.Decimal
|
|
Taxes() decimal.Decimal
|
|
}
|
|
|
|
type RecordReader interface {
|
|
// ReadRecord should return Records until an error is found.
|
|
ReadRecord(context.Context) (Record, error)
|
|
}
|
|
|
|
type ReportItem struct {
|
|
Symbol string
|
|
Nature Nature
|
|
BrokerCountry int64
|
|
AssetCountry int64
|
|
BuyValue decimal.Decimal
|
|
BuyTimestamp time.Time
|
|
SellValue decimal.Decimal
|
|
SellTimestamp time.Time
|
|
Fees decimal.Decimal
|
|
Taxes decimal.Decimal
|
|
}
|
|
|
|
func (ri ReportItem) RealisedPnL() decimal.Decimal {
|
|
return ri.SellValue.Sub(ri.BuyValue)
|
|
}
|
|
|
|
type ReportWriter interface {
|
|
// ReportWriter writes report items
|
|
Write(context.Context, ReportItem) error
|
|
}
|
|
|
|
// Selector returns true if a record should be selected for processing, false otherwise.
|
|
type Selector func(Record) bool
|
|
|
|
// BuildReport reads records from a RecordReader and, if the record passes the Selector, it is
|
|
// processed into the ReportWriter.
|
|
func BuildReport(ctx context.Context, reader RecordReader, writer ReportWriter, sel Selector) error {
|
|
buys := make(map[string]*FillerQueue)
|
|
|
|
var buysCount, sellsCount int64
|
|
var lastTimestamp time.Time
|
|
progTicker := time.NewTicker(10 * time.Second)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-progTicker.C:
|
|
slog.InfoContext(ctx, "Progress update",
|
|
slog.Int64("total_records", buysCount+sellsCount),
|
|
slog.Int64("sell_records", sellsCount),
|
|
slog.Int64("buy_records", buysCount),
|
|
slog.Time("last_record_timestamp", lastTimestamp),
|
|
)
|
|
default:
|
|
rec, err := reader.ReadRecord(ctx)
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
if rec.Kind().Is(KindBuy) {
|
|
buysCount++
|
|
} else if rec.Kind().Is(KindSell) {
|
|
sellsCount++
|
|
}
|
|
|
|
lastTimestamp = rec.Timestamp()
|
|
|
|
buyQueue, ok := buys[rec.Symbol()]
|
|
if !ok {
|
|
buyQueue = new(FillerQueue)
|
|
buys[rec.Symbol()] = buyQueue
|
|
}
|
|
|
|
err = processRecord(ctx, buyQueue, rec, sel, writer)
|
|
if err != nil {
|
|
return fmt.Errorf("processing record: %w", err)
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
// processRecord either adds buys to the queue or consumes buys from the queue when processing a
|
|
// sell record.
|
|
// Selectors are only applied on sells for performance reasons. It's much cheaper to just accumulate
|
|
// buys and only actually inspect a record once a sell happens due to potential network requests to
|
|
func processRecord(ctx context.Context, q *FillerQueue, rec Record, sel Selector, writer ReportWriter) error {
|
|
slog.Debug("Report: processing record",
|
|
slog.String("symbol", rec.Symbol()),
|
|
slog.String("side", rec.Kind().String()),
|
|
)
|
|
|
|
switch rec.Kind() {
|
|
case KindBuy:
|
|
q.Push(NewFiller(rec))
|
|
|
|
case KindSell:
|
|
if !sel(rec) {
|
|
slog.Debug("Report: skipping record",
|
|
slog.String("symbol", rec.Symbol()),
|
|
slog.String("side", rec.Kind().String()),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
unmatchedQty := rec.Quantity()
|
|
|
|
for unmatchedQty.IsPositive() {
|
|
buy, ok := q.Peek()
|
|
if !ok {
|
|
return ErrInsufficientBoughtVolume
|
|
}
|
|
|
|
matchedQty, filled := buy.Fill(unmatchedQty)
|
|
|
|
if filled {
|
|
_, ok := q.Pop()
|
|
if !ok {
|
|
return fmt.Errorf("pop empty filler queue")
|
|
}
|
|
}
|
|
|
|
unmatchedQty = unmatchedQty.Sub(matchedQty)
|
|
|
|
buyValue := matchedQty.Mul(buy.Price())
|
|
sellValue := matchedQty.Mul(rec.Price())
|
|
|
|
err := writer.Write(ctx, ReportItem{
|
|
Symbol: rec.Symbol(),
|
|
BrokerCountry: rec.BrokerCountry(),
|
|
AssetCountry: rec.AssetCountry(),
|
|
BuyValue: buyValue,
|
|
BuyTimestamp: buy.Timestamp(),
|
|
SellValue: sellValue,
|
|
SellTimestamp: rec.Timestamp(),
|
|
Fees: buy.Fees().Add(rec.Fees()),
|
|
Taxes: buy.Taxes().Add(rec.Taxes()),
|
|
Nature: buy.Nature(),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("write report item: %w", err)
|
|
}
|
|
}
|
|
|
|
case KindSplit:
|
|
q.AdjustForSplit(rec.Quantity())
|
|
|
|
default:
|
|
return fmt.Errorf("unknown side: %v", rec.Kind())
|
|
}
|
|
|
|
return nil
|
|
}
|