diff --git a/main.go b/cmd/broker2anexoj-cli/main.go similarity index 62% rename from main.go rename to cmd/broker2anexoj-cli/main.go index 2004426..4ee7bcd 100644 --- a/main.go +++ b/cmd/broker2anexoj-cli/main.go @@ -1,23 +1,31 @@ package main import ( + "context" "fmt" "log/slog" "os" + "os/signal" "git.naterciomoniz.net/applications/broker2anexoj/internal" "git.naterciomoniz.net/applications/broker2anexoj/internal/trading212" + "golang.org/x/sync/errgroup" ) func main() { - err := run() + err := run(context.Background()) if err != nil { slog.Error("found a fatal issue", slog.Any("err", err)) os.Exit(1) } } -func run() error { +func run(ctx context.Context) error { + ctx, cancel := signal.NotifyContext(ctx, os.Kill, os.Interrupt) + defer cancel() + + eg, ctx := errgroup.WithContext(ctx) + f, err := os.Open("test.csv") if err != nil { return fmt.Errorf("open statement: %w", err) @@ -27,7 +35,11 @@ func run() error { writer := internal.NewStdOutLogger() - err = internal.BuildReport(reader, writer) + eg.Go(func() error { + return internal.BuildReport(ctx, reader, writer) + }) + + err = eg.Wait() if err != nil { return err } diff --git a/internal/report.go b/internal/report.go index 5253d0b..a0018d6 100644 --- a/internal/report.go +++ b/internal/report.go @@ -1,6 +1,7 @@ package internal import ( + "context" "errors" "fmt" "io" @@ -10,40 +11,45 @@ import ( type RecordReader interface { // ReadRecord should return Records until an error is found. - ReadRecord() (Record, error) + ReadRecord(context.Context) (Record, error) } type ReportWriter interface { // ReportWriter writes report items - Write(ReportItem) error + Write(context.Context, ReportItem) error } -func BuildReport(reader RecordReader, writer ReportWriter) error { +func BuildReport(ctx context.Context, reader RecordReader, writer ReportWriter) error { buys := make(map[string]*RecordQueue) for { - rec, err := reader.ReadRecord() - if err != nil { - if errors.Is(err, io.EOF) { - return nil + select { + case <-ctx.Done(): + return ctx.Err() + default: + rec, err := reader.ReadRecord(ctx) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err } - return err - } - buyQueue, ok := buys[rec.Symbol()] - if !ok { - buyQueue = new(RecordQueue) - buys[rec.Symbol()] = buyQueue - } + buyQueue, ok := buys[rec.Symbol()] + if !ok { + buyQueue = new(RecordQueue) + buys[rec.Symbol()] = buyQueue + } - err = processRecord(buyQueue, rec, writer) - if err != nil { - return fmt.Errorf("processing record: %w", err) + err = processRecord(ctx, buyQueue, rec, writer) + if err != nil { + return fmt.Errorf("processing record: %w", err) + } } } } -func processRecord(q *RecordQueue, rec Record, writer ReportWriter) error { +func processRecord(ctx context.Context, q *RecordQueue, rec Record, writer ReportWriter) error { switch rec.Side() { case SideBuy: q.Push(rec) @@ -72,7 +78,7 @@ func processRecord(q *RecordQueue, rec Record, writer ReportWriter) error { sellValue := new(big.Float).Mul(matchedQty, rec.Price()) buyValue := new(big.Float).Mul(matchedQty, buy.Price()) - err := writer.Write(ReportItem{ + err := writer.Write(ctx, ReportItem{ BuyValue: buyValue, BuyTimestamp: buy.Timestamp(), SellValue: sellValue, diff --git a/internal/report_test.go b/internal/report_test.go index 345335d..99ecd2f 100644 --- a/internal/report_test.go +++ b/internal/report_test.go @@ -1,6 +1,7 @@ package internal_test import ( + "context" "io" "math/big" "testing" @@ -20,7 +21,7 @@ func TestReporter_Run(t *testing.T) { mockRecord(ctrl, 20.0, 10.0, internal.SideBuy, now), mockRecord(ctrl, 25.0, 10.0, internal.SideSell, now.Add(1)), } - reader.EXPECT().ReadRecord().DoAndReturn(func() (internal.Record, error) { + reader.EXPECT().ReadRecord(gomock.Any()).DoAndReturn(func(ctx context.Context) (internal.Record, error) { if len(records) > 0 { r := records[0] records = records[1:] @@ -31,7 +32,7 @@ func TestReporter_Run(t *testing.T) { }).Times(3) writer := mocks.NewMockReportWriter(ctrl) - writer.EXPECT().Write(gomock.Eq(internal.ReportItem{ + writer.EXPECT().Write(gomock.Any(), gomock.Eq(internal.ReportItem{ BuyValue: new(big.Float).SetFloat64(200.0), BuyTimestamp: now, SellValue: new(big.Float).SetFloat64(250.0), @@ -40,7 +41,7 @@ func TestReporter_Run(t *testing.T) { Taxes: new(big.Float), })).Times(1) - gotErr := internal.BuildReport(reader, writer) + gotErr := internal.BuildReport(t.Context(), reader, writer) if gotErr != nil { t.Fatalf("got unexpected err: %v", gotErr) } diff --git a/internal/stdout.go b/internal/stdout.go index 04b930b..20e36ea 100644 --- a/internal/stdout.go +++ b/internal/stdout.go @@ -1,6 +1,7 @@ package internal import ( + "context" "fmt" "io" "os" @@ -24,7 +25,7 @@ func NewReportLogger(w io.Writer) *ReportLogger { } } -func (rl *ReportLogger) Write(ri ReportItem) error { +func (rl *ReportLogger) Write(_ context.Context, ri ReportItem) error { rl.counter++ _, err := fmt.Fprintf(rl.writer, "%6d - realised %+f on %s\n", rl.counter, ri.RealisedPnL(), ri.SellTimestamp.Format(time.RFC3339)) return err diff --git a/internal/stdout_test.go b/internal/stdout_test.go index 5a0027b..7eb600b 100644 --- a/internal/stdout_test.go +++ b/internal/stdout_test.go @@ -73,7 +73,7 @@ func TestReportLogger_Write(t *testing.T) { rw := internal.NewReportLogger(buf) for _, item := range tt.items { - err := rw.Write(item) + err := rw.Write(t.Context(), item) if err != nil { t.Fatalf("unexpected error on write: %v", err) } diff --git a/internal/trading212/record.go b/internal/trading212/record.go index 1bc2ad5..2eda2c6 100644 --- a/internal/trading212/record.go +++ b/internal/trading212/record.go @@ -1,6 +1,7 @@ package trading212 import ( + "context" "encoding/csv" "fmt" "io" @@ -66,7 +67,7 @@ const ( LimitSell = "limit sell" ) -func (rr RecordReader) ReadRecord() (internal.Record, error) { +func (rr RecordReader) ReadRecord(_ context.Context) (internal.Record, error) { for { raw, err := rr.reader.Read() if err != nil { diff --git a/internal/trading212/record_test.go b/internal/trading212/record_test.go index 0eac006..3f405bb 100644 --- a/internal/trading212/record_test.go +++ b/internal/trading212/record_test.go @@ -93,7 +93,7 @@ func TestRecordReader_ReadRecord(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { rr := NewRecordReader(tt.r) - got, gotErr := rr.ReadRecord() + got, gotErr := rr.ReadRecord(t.Context()) if gotErr != nil { if !tt.wantErr { t.Fatalf("ReadRecord() failed: %v", gotErr)