Merge pull request 'isolate record reading and writing from processing' (#4) from refactor-report-logic into main

Reviewed-on: applications/broker2anexoj#4
This commit was merged in pull request #4.
This commit is contained in:
2025-11-13 14:22:35 +00:00
9 changed files with 301 additions and 171 deletions

12
go.mod
View File

@@ -2,4 +2,14 @@ module git.naterciomoniz.net/applications/broker2anexoj
go 1.25.3
require go.uber.org/mock v0.6.0
require (
go.uber.org/mock v0.6.0
golang.org/x/sync v0.18.0
)
require (
golang.org/x/mod v0.27.0 // indirect
golang.org/x/tools v0.36.0 // indirect
)
tool go.uber.org/mock/mockgen

6
go.sum
View File

@@ -1,2 +1,8 @@
go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y=
go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU=
golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ=
golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=

View File

@@ -1,5 +1,13 @@
package internal
import (
"errors"
"fmt"
"io"
"math/big"
"time"
)
type RecordReader interface {
// ReadRecord should return Records until an error is found.
ReadRecord() (Record, error)
@@ -9,3 +17,92 @@ type ReportWriter interface {
// ReportWriter writes report items
Write(ReportItem) error
}
func BuildReport(reader RecordReader, writer ReportWriter) error {
buys := make(map[string]*RecordQueue)
for {
rec, err := reader.ReadRecord()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
buyQueue, ok := buys[rec.Symbol()]
if !ok {
buyQueue = new(RecordQueue)
buys[rec.Symbol()] = buyQueue
}
err = processRecord(buyQueue, rec, writer)
if err != nil {
return fmt.Errorf("processing record: %w", err)
}
}
}
func processRecord(q *RecordQueue, rec Record, writer ReportWriter) error {
switch rec.Side() {
case SideBuy:
q.Push(rec)
case SideSell:
unmatchedQty := new(big.Float).Copy(rec.Quantity())
zero := new(big.Float)
for unmatchedQty.Cmp(zero) > 0 {
buy, ok := q.Peek()
if !ok {
return ErrInsufficientBoughtVolume
}
var matchedQty *big.Float
if buy.Quantity().Cmp(unmatchedQty) > 0 {
matchedQty = unmatchedQty
buy.Quantity().Sub(buy.Quantity(), unmatchedQty)
} else {
matchedQty = buy.Quantity()
q.Pop()
}
unmatchedQty.Sub(unmatchedQty, matchedQty)
sellValue := new(big.Float).Mul(matchedQty, rec.Price())
buyValue := new(big.Float).Mul(matchedQty, buy.Price())
err := writer.Write(ReportItem{
BuyValue: buyValue,
BuyTimestamp: buy.Timestamp(),
SellValue: sellValue,
SellTimestamp: rec.Timestamp(),
Fees: new(big.Float).Add(buy.Fees(), rec.Fees()),
Taxes: new(big.Float).Add(buy.Taxes(), rec.Fees()),
})
if err != nil {
return fmt.Errorf("write report item: %w", err)
}
}
default:
return fmt.Errorf("unknown side: %v", rec.Side())
}
return nil
}
type ReportItem struct {
BuyValue *big.Float
BuyTimestamp time.Time
SellValue *big.Float
SellTimestamp time.Time
Fees *big.Float
Taxes *big.Float
}
func (ri ReportItem) RealisedPnL() *big.Float {
return new(big.Float).Sub(ri.SellValue, ri.BuyValue)
}
var ErrInsufficientBoughtVolume = fmt.Errorf("insufficient bought volume")

59
internal/report_test.go Normal file
View File

@@ -0,0 +1,59 @@
package internal_test
import (
"io"
"math/big"
"testing"
"time"
"git.naterciomoniz.net/applications/broker2anexoj/internal"
"git.naterciomoniz.net/applications/broker2anexoj/internal/mocks"
"go.uber.org/mock/gomock"
)
func TestReporter_Run(t *testing.T) {
now := time.Now()
ctrl := gomock.NewController(t)
reader := mocks.NewMockRecordReader(ctrl)
records := []internal.Record{
mockRecord(ctrl, 20.0, 10.0, internal.SideBuy, now),
mockRecord(ctrl, 25.0, 10.0, internal.SideSell, now.Add(1)),
}
reader.EXPECT().ReadRecord().DoAndReturn(func() (internal.Record, error) {
if len(records) > 0 {
r := records[0]
records = records[1:]
return r, nil
} else {
return nil, io.EOF
}
}).Times(3)
writer := mocks.NewMockReportWriter(ctrl)
writer.EXPECT().Write(gomock.Eq(internal.ReportItem{
BuyValue: new(big.Float).SetFloat64(200.0),
BuyTimestamp: now,
SellValue: new(big.Float).SetFloat64(250.0),
SellTimestamp: now.Add(1),
Fees: new(big.Float),
Taxes: new(big.Float),
})).Times(1)
gotErr := internal.BuildReport(reader, writer)
if gotErr != nil {
t.Fatalf("got unexpected err: %v", gotErr)
}
}
func mockRecord(ctrl *gomock.Controller, price, quantity float64, side internal.Side, ts time.Time) *mocks.MockRecord {
rec := mocks.NewMockRecord(ctrl)
rec.EXPECT().Price().Return(big.NewFloat(price)).AnyTimes()
rec.EXPECT().Quantity().Return(big.NewFloat(quantity)).AnyTimes()
rec.EXPECT().Side().Return(side).AnyTimes()
rec.EXPECT().Symbol().Return("TEST").AnyTimes()
rec.EXPECT().Timestamp().Return(ts).AnyTimes()
rec.EXPECT().Fees().Return(new(big.Float)).AnyTimes()
rec.EXPECT().Taxes().Return(new(big.Float)).AnyTimes()
return rec
}

View File

@@ -1,125 +0,0 @@
package internal
import (
"errors"
"fmt"
"io"
"log/slog"
"math/big"
"sync"
)
// Reporter consumes each record to produce ReportItem.
type Reporter struct {
reader RecordReader
}
func NewReporter(rr RecordReader) *Reporter {
return &Reporter{
reader: rr,
}
}
func (r *Reporter) Run() error {
forewarders := make(map[string]chan Record)
aggregator := make(chan processResult)
defer close(aggregator)
go func() {
for result := range aggregator {
fmt.Printf("%v\n", result)
}
}()
wg := sync.WaitGroup{}
defer func() {
wg.Wait()
}()
for {
rec, err := r.reader.ReadRecord()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
router, ok := forewarders[rec.Symbol()]
if !ok {
router = make(chan Record, 1)
defer close(router)
wg.Go(func() {
processRecords(router, aggregator)
})
forewarders[rec.Symbol()] = router
}
router <- rec
}
}
func processRecords(records <-chan Record, results chan<- processResult) {
var q RecordQueue
for rec := range records {
switch rec.Side() {
case SideBuy:
q.Push(rec)
case SideSell:
unmatchedQty := new(big.Float).Copy(rec.Quantity())
zero := new(big.Float)
for unmatchedQty.Cmp(zero) > 0 {
buy, ok := q.Pop()
if !ok {
results <- processResult{
err: ErrSellWithoutBuy,
}
return
}
var matchedQty *big.Float
if buy.Quantity().Cmp(unmatchedQty) > 0 {
matchedQty = unmatchedQty
buy.Quantity().Sub(buy.Quantity(), unmatchedQty)
} else {
matchedQty = buy.Quantity()
}
unmatchedQty.Sub(unmatchedQty, matchedQty)
sellValue := new(big.Float).Mul(matchedQty, rec.Price())
buyValue := new(big.Float).Mul(matchedQty, buy.Price())
realisedPnL := new(big.Float).Sub(sellValue, buyValue)
slog.Info("Realised PnL",
slog.Any("Symbol", rec.Symbol()),
slog.Any("PnL", realisedPnL),
slog.Any("Timestamp", rec.Timestamp()))
results <- processResult{
item: ReportItem{},
}
}
default:
results <- processResult{
err: fmt.Errorf("unknown side: %v", rec.Side()),
}
return
}
}
}
type processResult struct {
item ReportItem
err error
}
type ReportItem struct{}
var ErrSellWithoutBuy = fmt.Errorf("found sell without bought volume")

View File

@@ -1,42 +0,0 @@
package internal_test
import (
"io"
"math/big"
"testing"
"git.naterciomoniz.net/applications/broker2anexoj/internal"
"git.naterciomoniz.net/applications/broker2anexoj/internal/mocks"
"go.uber.org/mock/gomock"
)
func TestReporter_Run(t *testing.T) {
ctrl := gomock.NewController(t)
rec := mocks.NewMockRecord(ctrl)
rec.EXPECT().Price().Return(big.NewFloat(1.25)).AnyTimes()
rec.EXPECT().Quantity().Return(big.NewFloat(10)).AnyTimes()
rec.EXPECT().Side().Return(internal.SideBuy).AnyTimes()
rec.EXPECT().Symbol().Return("TEST").AnyTimes()
reader := mocks.NewMockRecordReader(ctrl)
records := []internal.Record{
rec,
rec,
}
reader.EXPECT().ReadRecord().DoAndReturn(func() (internal.Record, error) {
if len(records) > 0 {
r := records[0]
records = records[1:]
return r, nil
} else {
return nil, io.EOF
}
}).AnyTimes()
reporter := internal.NewReporter(reader)
gotErr := reporter.Run()
if gotErr != nil {
t.Fatalf("got unexpected err: %v", gotErr)
}
}

31
internal/stdout.go Normal file
View File

@@ -0,0 +1,31 @@
package internal
import (
"fmt"
"io"
"os"
"time"
)
type ReportLogger struct {
counter int
writer io.Writer
}
func NewStdOutLogger() *ReportLogger {
return &ReportLogger{
writer: os.Stdout,
}
}
func NewReportLogger(w io.Writer) *ReportLogger {
return &ReportLogger{
writer: w,
}
}
func (rl *ReportLogger) Write(ri ReportItem) error {
rl.counter++
_, err := fmt.Fprintf(rl.writer, "%6d - realised %+f on %s\n", rl.counter, ri.RealisedPnL(), ri.SellTimestamp.Format(time.RFC3339))
return err
}

93
internal/stdout_test.go Normal file
View File

@@ -0,0 +1,93 @@
package internal_test
import (
"bytes"
"fmt"
"math/big"
"testing"
"time"
"git.naterciomoniz.net/applications/broker2anexoj/internal"
)
func TestReportLogger_Write(t *testing.T) {
tNow := time.Now()
tests := []struct {
name string
items []internal.ReportItem
want []string
}{
{
name: "empty",
},
{
name: "single item positive",
items: []internal.ReportItem{
{
BuyValue: new(big.Float).SetFloat64(100.0),
SellValue: new(big.Float).SetFloat64(200.0),
SellTimestamp: tNow,
},
},
want: []string{
fmt.Sprintf("%6d - realised +100.000000 on %s\n", 1, tNow.Format(time.RFC3339)),
},
},
{
name: "single item negative",
items: []internal.ReportItem{
{
BuyValue: new(big.Float).SetFloat64(200.0),
SellValue: new(big.Float).SetFloat64(150.0),
SellTimestamp: tNow,
},
},
want: []string{
fmt.Sprintf("%6d - realised -50.000000 on %s\n", 1, tNow.Format(time.RFC3339)),
},
},
{
name: "multiple items",
items: []internal.ReportItem{
{
BuyValue: new(big.Float).SetFloat64(100.0),
SellValue: new(big.Float).SetFloat64(200.0),
SellTimestamp: tNow,
},
{
BuyValue: new(big.Float).SetFloat64(200.0),
SellValue: new(big.Float).SetFloat64(150.0),
SellTimestamp: tNow.Add(1),
},
},
want: []string{
fmt.Sprintf("%6d - realised +100.000000 on %s\n", 1, tNow.Format(time.RFC3339)),
fmt.Sprintf("%6d - realised -50.000000 on %s\n", 2, tNow.Add(1).Format(time.RFC3339)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buf := new(bytes.Buffer)
rw := internal.NewReportLogger(buf)
for _, item := range tt.items {
err := rw.Write(item)
if err != nil {
t.Fatalf("unexpected error on write: %v", err)
}
}
for _, wantLine := range tt.want {
gotLine, err := buf.ReadString(byte('\n'))
if err != nil {
t.Fatalf("unexpected error on buffer read: %v", err)
}
if wantLine != gotLine {
t.Fatalf("want line %q but got %q", wantLine, gotLine)
}
}
})
}
}

View File

@@ -12,7 +12,8 @@ import (
func main() {
err := run()
if err != nil {
slog.Error("fatal error", slog.Any("err", err))
slog.Error("found a fatal issue", slog.Any("err", err))
os.Exit(1)
}
}
@@ -24,9 +25,9 @@ func run() error {
reader := trading212.NewRecordReader(f)
reporter := internal.NewReporter(reader)
writer := internal.NewStdOutLogger()
err = reporter.Run()
err = internal.BuildReport(reader, writer)
if err != nil {
return err
}