From 70bd8622de28f02fc35c25af181ff18081897be9 Mon Sep 17 00:00:00 2001 From: Natercio Moniz Date: Thu, 13 Nov 2025 14:07:08 +0000 Subject: [PATCH] isolate record reading and writing from processing --- go.mod | 12 +++- go.sum | 6 ++ internal/report.go | 97 +++++++++++++++++++++++++++++ internal/report_test.go | 59 ++++++++++++++++++ internal/reporter.go | 125 -------------------------------------- internal/reporter_test.go | 42 ------------- internal/stdout.go | 31 ++++++++++ internal/stdout_test.go | 93 ++++++++++++++++++++++++++++ main.go | 7 ++- 9 files changed, 301 insertions(+), 171 deletions(-) create mode 100644 internal/report_test.go delete mode 100644 internal/reporter.go delete mode 100644 internal/reporter_test.go create mode 100644 internal/stdout.go create mode 100644 internal/stdout_test.go diff --git a/go.mod b/go.mod index eb9e8be..12ea300 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,14 @@ module git.naterciomoniz.net/applications/broker2anexoj go 1.25.3 -require go.uber.org/mock v0.6.0 +require ( + go.uber.org/mock v0.6.0 + golang.org/x/sync v0.18.0 +) + +require ( + golang.org/x/mod v0.27.0 // indirect + golang.org/x/tools v0.36.0 // indirect +) + +tool go.uber.org/mock/mockgen diff --git a/go.sum b/go.sum index 3a696b9..3e96aa3 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,8 @@ go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y= go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU= +golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ= +golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg= +golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s= diff --git a/internal/report.go b/internal/report.go index bef2e8f..5253d0b 100644 --- a/internal/report.go +++ b/internal/report.go @@ -1,5 +1,13 @@ package internal +import ( + "errors" + "fmt" + "io" + "math/big" + "time" +) + type RecordReader interface { // ReadRecord should return Records until an error is found. ReadRecord() (Record, error) @@ -9,3 +17,92 @@ type ReportWriter interface { // ReportWriter writes report items Write(ReportItem) error } + +func BuildReport(reader RecordReader, writer ReportWriter) error { + buys := make(map[string]*RecordQueue) + + for { + rec, err := reader.ReadRecord() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + + buyQueue, ok := buys[rec.Symbol()] + if !ok { + buyQueue = new(RecordQueue) + buys[rec.Symbol()] = buyQueue + } + + err = processRecord(buyQueue, rec, writer) + if err != nil { + return fmt.Errorf("processing record: %w", err) + } + } +} + +func processRecord(q *RecordQueue, rec Record, writer ReportWriter) error { + switch rec.Side() { + case SideBuy: + q.Push(rec) + + case SideSell: + unmatchedQty := new(big.Float).Copy(rec.Quantity()) + zero := new(big.Float) + + for unmatchedQty.Cmp(zero) > 0 { + buy, ok := q.Peek() + if !ok { + return ErrInsufficientBoughtVolume + } + + var matchedQty *big.Float + if buy.Quantity().Cmp(unmatchedQty) > 0 { + matchedQty = unmatchedQty + buy.Quantity().Sub(buy.Quantity(), unmatchedQty) + } else { + matchedQty = buy.Quantity() + q.Pop() + } + + unmatchedQty.Sub(unmatchedQty, matchedQty) + + sellValue := new(big.Float).Mul(matchedQty, rec.Price()) + buyValue := new(big.Float).Mul(matchedQty, buy.Price()) + + err := writer.Write(ReportItem{ + BuyValue: buyValue, + BuyTimestamp: buy.Timestamp(), + SellValue: sellValue, + SellTimestamp: rec.Timestamp(), + Fees: new(big.Float).Add(buy.Fees(), rec.Fees()), + Taxes: new(big.Float).Add(buy.Taxes(), rec.Fees()), + }) + if err != nil { + return fmt.Errorf("write report item: %w", err) + } + } + + default: + return fmt.Errorf("unknown side: %v", rec.Side()) + } + + return nil +} + +type ReportItem struct { + BuyValue *big.Float + BuyTimestamp time.Time + SellValue *big.Float + SellTimestamp time.Time + Fees *big.Float + Taxes *big.Float +} + +func (ri ReportItem) RealisedPnL() *big.Float { + return new(big.Float).Sub(ri.SellValue, ri.BuyValue) +} + +var ErrInsufficientBoughtVolume = fmt.Errorf("insufficient bought volume") diff --git a/internal/report_test.go b/internal/report_test.go new file mode 100644 index 0000000..345335d --- /dev/null +++ b/internal/report_test.go @@ -0,0 +1,59 @@ +package internal_test + +import ( + "io" + "math/big" + "testing" + "time" + + "git.naterciomoniz.net/applications/broker2anexoj/internal" + "git.naterciomoniz.net/applications/broker2anexoj/internal/mocks" + "go.uber.org/mock/gomock" +) + +func TestReporter_Run(t *testing.T) { + now := time.Now() + ctrl := gomock.NewController(t) + + reader := mocks.NewMockRecordReader(ctrl) + records := []internal.Record{ + mockRecord(ctrl, 20.0, 10.0, internal.SideBuy, now), + mockRecord(ctrl, 25.0, 10.0, internal.SideSell, now.Add(1)), + } + reader.EXPECT().ReadRecord().DoAndReturn(func() (internal.Record, error) { + if len(records) > 0 { + r := records[0] + records = records[1:] + return r, nil + } else { + return nil, io.EOF + } + }).Times(3) + + writer := mocks.NewMockReportWriter(ctrl) + writer.EXPECT().Write(gomock.Eq(internal.ReportItem{ + BuyValue: new(big.Float).SetFloat64(200.0), + BuyTimestamp: now, + SellValue: new(big.Float).SetFloat64(250.0), + SellTimestamp: now.Add(1), + Fees: new(big.Float), + Taxes: new(big.Float), + })).Times(1) + + gotErr := internal.BuildReport(reader, writer) + if gotErr != nil { + t.Fatalf("got unexpected err: %v", gotErr) + } +} + +func mockRecord(ctrl *gomock.Controller, price, quantity float64, side internal.Side, ts time.Time) *mocks.MockRecord { + rec := mocks.NewMockRecord(ctrl) + rec.EXPECT().Price().Return(big.NewFloat(price)).AnyTimes() + rec.EXPECT().Quantity().Return(big.NewFloat(quantity)).AnyTimes() + rec.EXPECT().Side().Return(side).AnyTimes() + rec.EXPECT().Symbol().Return("TEST").AnyTimes() + rec.EXPECT().Timestamp().Return(ts).AnyTimes() + rec.EXPECT().Fees().Return(new(big.Float)).AnyTimes() + rec.EXPECT().Taxes().Return(new(big.Float)).AnyTimes() + return rec +} diff --git a/internal/reporter.go b/internal/reporter.go deleted file mode 100644 index 9d18964..0000000 --- a/internal/reporter.go +++ /dev/null @@ -1,125 +0,0 @@ -package internal - -import ( - "errors" - "fmt" - "io" - "log/slog" - "math/big" - "sync" -) - -// Reporter consumes each record to produce ReportItem. -type Reporter struct { - reader RecordReader -} - -func NewReporter(rr RecordReader) *Reporter { - return &Reporter{ - reader: rr, - } -} - -func (r *Reporter) Run() error { - forewarders := make(map[string]chan Record) - - aggregator := make(chan processResult) - defer close(aggregator) - - go func() { - for result := range aggregator { - fmt.Printf("%v\n", result) - } - }() - - wg := sync.WaitGroup{} - defer func() { - wg.Wait() - }() - - for { - rec, err := r.reader.ReadRecord() - if err != nil { - if errors.Is(err, io.EOF) { - return nil - } - return err - } - - router, ok := forewarders[rec.Symbol()] - if !ok { - router = make(chan Record, 1) - defer close(router) - - wg.Go(func() { - processRecords(router, aggregator) - }) - - forewarders[rec.Symbol()] = router - } - - router <- rec - } -} - -func processRecords(records <-chan Record, results chan<- processResult) { - var q RecordQueue - - for rec := range records { - switch rec.Side() { - case SideBuy: - q.Push(rec) - - case SideSell: - unmatchedQty := new(big.Float).Copy(rec.Quantity()) - zero := new(big.Float) - - for unmatchedQty.Cmp(zero) > 0 { - buy, ok := q.Pop() - if !ok { - results <- processResult{ - err: ErrSellWithoutBuy, - } - return - } - - var matchedQty *big.Float - if buy.Quantity().Cmp(unmatchedQty) > 0 { - matchedQty = unmatchedQty - buy.Quantity().Sub(buy.Quantity(), unmatchedQty) - } else { - matchedQty = buy.Quantity() - } - - unmatchedQty.Sub(unmatchedQty, matchedQty) - - sellValue := new(big.Float).Mul(matchedQty, rec.Price()) - buyValue := new(big.Float).Mul(matchedQty, buy.Price()) - realisedPnL := new(big.Float).Sub(sellValue, buyValue) - slog.Info("Realised PnL", - slog.Any("Symbol", rec.Symbol()), - slog.Any("PnL", realisedPnL), - slog.Any("Timestamp", rec.Timestamp())) - - results <- processResult{ - item: ReportItem{}, - } - } - - default: - results <- processResult{ - err: fmt.Errorf("unknown side: %v", rec.Side()), - } - return - } - } -} - -type processResult struct { - item ReportItem - err error -} - -type ReportItem struct{} - -var ErrSellWithoutBuy = fmt.Errorf("found sell without bought volume") diff --git a/internal/reporter_test.go b/internal/reporter_test.go deleted file mode 100644 index 8d5be16..0000000 --- a/internal/reporter_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package internal_test - -import ( - "io" - "math/big" - "testing" - - "git.naterciomoniz.net/applications/broker2anexoj/internal" - "git.naterciomoniz.net/applications/broker2anexoj/internal/mocks" - "go.uber.org/mock/gomock" -) - -func TestReporter_Run(t *testing.T) { - ctrl := gomock.NewController(t) - - rec := mocks.NewMockRecord(ctrl) - rec.EXPECT().Price().Return(big.NewFloat(1.25)).AnyTimes() - rec.EXPECT().Quantity().Return(big.NewFloat(10)).AnyTimes() - rec.EXPECT().Side().Return(internal.SideBuy).AnyTimes() - rec.EXPECT().Symbol().Return("TEST").AnyTimes() - - reader := mocks.NewMockRecordReader(ctrl) - records := []internal.Record{ - rec, - rec, - } - reader.EXPECT().ReadRecord().DoAndReturn(func() (internal.Record, error) { - if len(records) > 0 { - r := records[0] - records = records[1:] - return r, nil - } else { - return nil, io.EOF - } - }).AnyTimes() - - reporter := internal.NewReporter(reader) - gotErr := reporter.Run() - if gotErr != nil { - t.Fatalf("got unexpected err: %v", gotErr) - } -} diff --git a/internal/stdout.go b/internal/stdout.go new file mode 100644 index 0000000..04b930b --- /dev/null +++ b/internal/stdout.go @@ -0,0 +1,31 @@ +package internal + +import ( + "fmt" + "io" + "os" + "time" +) + +type ReportLogger struct { + counter int + writer io.Writer +} + +func NewStdOutLogger() *ReportLogger { + return &ReportLogger{ + writer: os.Stdout, + } +} + +func NewReportLogger(w io.Writer) *ReportLogger { + return &ReportLogger{ + writer: w, + } +} + +func (rl *ReportLogger) Write(ri ReportItem) error { + rl.counter++ + _, err := fmt.Fprintf(rl.writer, "%6d - realised %+f on %s\n", rl.counter, ri.RealisedPnL(), ri.SellTimestamp.Format(time.RFC3339)) + return err +} diff --git a/internal/stdout_test.go b/internal/stdout_test.go new file mode 100644 index 0000000..5a0027b --- /dev/null +++ b/internal/stdout_test.go @@ -0,0 +1,93 @@ +package internal_test + +import ( + "bytes" + "fmt" + "math/big" + "testing" + "time" + + "git.naterciomoniz.net/applications/broker2anexoj/internal" +) + +func TestReportLogger_Write(t *testing.T) { + tNow := time.Now() + + tests := []struct { + name string + items []internal.ReportItem + want []string + }{ + { + name: "empty", + }, + { + name: "single item positive", + items: []internal.ReportItem{ + { + BuyValue: new(big.Float).SetFloat64(100.0), + SellValue: new(big.Float).SetFloat64(200.0), + SellTimestamp: tNow, + }, + }, + want: []string{ + fmt.Sprintf("%6d - realised +100.000000 on %s\n", 1, tNow.Format(time.RFC3339)), + }, + }, + { + name: "single item negative", + items: []internal.ReportItem{ + { + BuyValue: new(big.Float).SetFloat64(200.0), + SellValue: new(big.Float).SetFloat64(150.0), + SellTimestamp: tNow, + }, + }, + want: []string{ + fmt.Sprintf("%6d - realised -50.000000 on %s\n", 1, tNow.Format(time.RFC3339)), + }, + }, + { + name: "multiple items", + items: []internal.ReportItem{ + { + BuyValue: new(big.Float).SetFloat64(100.0), + SellValue: new(big.Float).SetFloat64(200.0), + SellTimestamp: tNow, + }, + { + BuyValue: new(big.Float).SetFloat64(200.0), + SellValue: new(big.Float).SetFloat64(150.0), + SellTimestamp: tNow.Add(1), + }, + }, + want: []string{ + fmt.Sprintf("%6d - realised +100.000000 on %s\n", 1, tNow.Format(time.RFC3339)), + fmt.Sprintf("%6d - realised -50.000000 on %s\n", 2, tNow.Add(1).Format(time.RFC3339)), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + buf := new(bytes.Buffer) + rw := internal.NewReportLogger(buf) + + for _, item := range tt.items { + err := rw.Write(item) + if err != nil { + t.Fatalf("unexpected error on write: %v", err) + } + } + + for _, wantLine := range tt.want { + gotLine, err := buf.ReadString(byte('\n')) + if err != nil { + t.Fatalf("unexpected error on buffer read: %v", err) + } + if wantLine != gotLine { + t.Fatalf("want line %q but got %q", wantLine, gotLine) + } + } + }) + } +} diff --git a/main.go b/main.go index a561cd9..2004426 100644 --- a/main.go +++ b/main.go @@ -12,7 +12,8 @@ import ( func main() { err := run() if err != nil { - slog.Error("fatal error", slog.Any("err", err)) + slog.Error("found a fatal issue", slog.Any("err", err)) + os.Exit(1) } } @@ -24,9 +25,9 @@ func run() error { reader := trading212.NewRecordReader(f) - reporter := internal.NewReporter(reader) + writer := internal.NewStdOutLogger() - err = reporter.Run() + err = internal.BuildReport(reader, writer) if err != nil { return err }