Files
any2anexoj/internal/reporter.go

126 lines
2.2 KiB
Go

package internal
import (
"errors"
"fmt"
"io"
"log/slog"
"math/big"
"sync"
)
// Reporter consumes each record to produce ReportItem.
type Reporter struct {
reader RecordReader
}
func NewReporter(rr RecordReader) *Reporter {
return &Reporter{
reader: rr,
}
}
func (r *Reporter) Run() error {
forewarders := make(map[string]chan Record)
aggregator := make(chan processResult)
defer close(aggregator)
go func() {
for result := range aggregator {
fmt.Printf("%v\n", result)
}
}()
wg := sync.WaitGroup{}
defer func() {
wg.Wait()
}()
for {
rec, err := r.reader.ReadRecord()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
router, ok := forewarders[rec.Symbol()]
if !ok {
router = make(chan Record, 1)
defer close(router)
wg.Go(func() {
processRecords(router, aggregator)
})
forewarders[rec.Symbol()] = router
}
router <- rec
}
}
func processRecords(records <-chan Record, results chan<- processResult) {
var q RecordQueue
for rec := range records {
switch rec.Side() {
case SideBuy:
q.Push(rec)
case SideSell:
unmatchedQty := new(big.Float).Copy(rec.Quantity())
zero := new(big.Float)
for unmatchedQty.Cmp(zero) > 0 {
buy, ok := q.Pop()
if !ok {
results <- processResult{
err: ErrSellWithoutBuy,
}
return
}
var matchedQty *big.Float
if buy.Quantity().Cmp(unmatchedQty) > 0 {
matchedQty = unmatchedQty
buy.Quantity().Sub(buy.Quantity(), unmatchedQty)
} else {
matchedQty = buy.Quantity()
}
unmatchedQty.Sub(unmatchedQty, matchedQty)
sellValue := new(big.Float).Mul(matchedQty, rec.Price())
buyValue := new(big.Float).Mul(matchedQty, buy.Price())
realisedPnL := new(big.Float).Sub(sellValue, buyValue)
slog.Info("Realised PnL",
slog.Any("Symbol", rec.Symbol()),
slog.Any("PnL", realisedPnL),
slog.Any("Timestamp", rec.Timestamp()))
results <- processResult{
item: ReportItem{},
}
}
default:
results <- processResult{
err: fmt.Errorf("unknown side: %v", rec.Side()),
}
return
}
}
}
type processResult struct {
item ReportItem
err error
}
type ReportItem struct{}
var ErrSellWithoutBuy = fmt.Errorf("found sell without bought volume")