Compare commits
6 Commits
840919e8aa
...
d371aca767
| Author | SHA1 | Date | |
|---|---|---|---|
| d371aca767 | |||
| 24c2814eef | |||
| c110a2cc70 | |||
| 1c29f52cce | |||
| b0d91e7eee | |||
| 91885b1993 |
@@ -21,14 +21,10 @@ var (
|
|||||||
// remove/change default
|
// remove/change default
|
||||||
platform = pflag.StringP("platform", "p", "trading212", "One of the supported platforms")
|
platform = pflag.StringP("platform", "p", "trading212", "One of the supported platforms")
|
||||||
lang = pflag.StringP("language", "l", language.Portuguese.String(), "The 2 letter language code")
|
lang = pflag.StringP("language", "l", language.Portuguese.String(), "The 2 letter language code")
|
||||||
|
debug = pflag.BoolP("debug", "d", false, "Activate to log debug messages")
|
||||||
|
ofAPIKey = pflag.String("open-figi-api-key", "", "An OpenFIGI API key for faster report generation (better rate api rate limits)")
|
||||||
// TODO: improve documentation on selectors
|
// TODO: improve documentation on selectors
|
||||||
selectors = pflag.StringSlice("selectors", nil, "Only process entries that conform to all the selectors:")
|
selectors = pflag.StringSlice("selectors", nil, "Only process entries that conform to all the selectors:")
|
||||||
|
|
||||||
readerFactories = map[string]func() internal.RecordReader{
|
|
||||||
"trading212": func() internal.RecordReader {
|
|
||||||
return trading212.NewRecordReader(os.Stdin, internal.NewOpenFIGI(&http.Client{Timeout: 5 * time.Second}))
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -42,6 +38,17 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func run(ctx context.Context) error {
|
func run(ctx context.Context) error {
|
||||||
|
ctx, cancel := signal.NotifyContext(ctx, os.Kill, os.Interrupt)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
|
logLevel := slog.LevelInfo
|
||||||
|
if *debug {
|
||||||
|
logLevel = slog.LevelDebug
|
||||||
|
}
|
||||||
|
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})))
|
||||||
|
|
||||||
if platform == nil || len(*platform) == 0 {
|
if platform == nil || len(*platform) == 0 {
|
||||||
slog.Error("--platform flag is required")
|
slog.Error("--platform flag is required")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
@@ -52,20 +59,11 @@ func run(ctx context.Context) error {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := signal.NotifyContext(ctx, os.Kill, os.Interrupt)
|
reader, err := getReader(*platform, *ofAPIKey)
|
||||||
defer cancel()
|
if err != nil {
|
||||||
|
return fmt.Errorf("getting reader: %w", err)
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
|
||||||
|
|
||||||
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, nil)))
|
|
||||||
|
|
||||||
factory, ok := readerFactories[*platform]
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("unsupported platform: %s", *platform)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
reader := factory()
|
|
||||||
|
|
||||||
writer := internal.NewAggregatorWriter()
|
writer := internal.NewAggregatorWriter()
|
||||||
|
|
||||||
selector, err := internal.ParseSelectors(*selectors)
|
selector, err := internal.ParseSelectors(*selectors)
|
||||||
@@ -93,3 +91,12 @@ func run(ctx context.Context) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getReader(platform string, ofAPIKey string) (internal.RecordReader, error) {
|
||||||
|
switch platform {
|
||||||
|
case "trading212":
|
||||||
|
return trading212.NewRecordReader(os.Stdin, internal.NewOpenFIGI(&http.Client{Timeout: 5 * time.Second}, ofAPIKey)), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unsupported platform: %s", platform)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -13,9 +14,12 @@ import (
|
|||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OpenFIGI is a small adapter for the openfigi.com api
|
var OpenFIGIAPIKeyHeader = http.CanonicalHeaderKey("X-OPENFIGI-APIKEY")
|
||||||
|
|
||||||
|
// OpenFIGI is a small adapter for the openfigi.com api.
|
||||||
type OpenFIGI struct {
|
type OpenFIGI struct {
|
||||||
client *http.Client
|
client *http.Client
|
||||||
|
apiKey string
|
||||||
mappingLimiter *rate.Limiter
|
mappingLimiter *rate.Limiter
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
@@ -25,11 +29,21 @@ type OpenFIGI struct {
|
|||||||
securityTypeCache map[string]string
|
securityTypeCache map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewOpenFIGI(c *http.Client) *OpenFIGI {
|
// NewOpenFIGI creates an OpenFIGI client that uses the API key if provided
|
||||||
return &OpenFIGI{
|
func NewOpenFIGI(c *http.Client, apiKey string) *OpenFIGI {
|
||||||
client: c,
|
// Rate limits as per https://www.openfigi.com/api/documentation#rate-limits
|
||||||
mappingLimiter: rate.NewLimiter(rate.Every(time.Minute), 25), // https://www.openfigi.com/api/documentation#rate-limits
|
limiter := rate.NewLimiter(rate.Every(time.Minute), 25)
|
||||||
|
if len(apiKey) > 0 {
|
||||||
|
slog.Debug("OpenFIGI client: created with API Key rate limits")
|
||||||
|
limiter = rate.NewLimiter(rate.Every(time.Second*6), 25)
|
||||||
|
} else {
|
||||||
|
slog.Debug("OpenFIGI client: created with puplic rate limits")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &OpenFIGI{
|
||||||
|
client: c,
|
||||||
|
apiKey: apiKey,
|
||||||
|
mappingLimiter: limiter,
|
||||||
securityTypeCache: make(map[string]string),
|
securityTypeCache: make(map[string]string),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -38,10 +52,16 @@ func (of *OpenFIGI) SecurityTypeByISIN(ctx context.Context, isin string) (string
|
|||||||
of.mu.RLock()
|
of.mu.RLock()
|
||||||
if secType, ok := of.securityTypeCache[isin]; ok {
|
if secType, ok := of.securityTypeCache[isin]; ok {
|
||||||
of.mu.RUnlock()
|
of.mu.RUnlock()
|
||||||
|
slog.Debug("OpenFIGI client: SecurityTypeByISIN cache hit",
|
||||||
|
slog.String("isin", isin),
|
||||||
|
slog.String("security_type", secType))
|
||||||
return secType, nil
|
return secType, nil
|
||||||
}
|
}
|
||||||
of.mu.RUnlock()
|
of.mu.RUnlock()
|
||||||
|
|
||||||
|
slog.Debug("OpenFIGI client: SecurityTypeByISIN cache miss",
|
||||||
|
slog.String("isin", isin))
|
||||||
|
|
||||||
of.mu.Lock()
|
of.mu.Lock()
|
||||||
defer of.mu.Unlock()
|
defer of.mu.Unlock()
|
||||||
|
|
||||||
@@ -71,6 +91,14 @@ func (of *OpenFIGI) SecurityTypeByISIN(ctx context.Context, isin string) (string
|
|||||||
|
|
||||||
req.Header.Add("Content-Type", "application/json")
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
|
||||||
|
if len(of.apiKey) > 0 {
|
||||||
|
req.Header.Add(OpenFIGIAPIKeyHeader, of.apiKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !of.mappingLimiter.Allow() {
|
||||||
|
slog.Debug("OpenFIGI client: mapping limiter waiting for rate limiter capacity")
|
||||||
|
}
|
||||||
|
|
||||||
err = of.mappingLimiter.Wait(ctx)
|
err = of.mappingLimiter.Wait(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("wait for mapping request capacity: %w", err)
|
return "", fmt.Errorf("wait for mapping request capacity: %w", err)
|
||||||
@@ -109,6 +137,10 @@ func (of *OpenFIGI) SecurityTypeByISIN(ctx context.Context, isin string) (string
|
|||||||
|
|
||||||
of.securityTypeCache[isin] = secType
|
of.securityTypeCache[isin] = secType
|
||||||
|
|
||||||
|
slog.Debug("OpenFIGI client: SecurityTypeByISIN cached mapping",
|
||||||
|
slog.String("isin", isin),
|
||||||
|
slog.String("security_type", secType))
|
||||||
|
|
||||||
return secType, nil
|
return secType, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -110,7 +110,7 @@ func TestOpenFIGI_SecurityTypeByISIN(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
of := internal.NewOpenFIGI(tt.client)
|
of := internal.NewOpenFIGI(tt.client, "")
|
||||||
|
|
||||||
got, gotErr := of.SecurityTypeByISIN(context.Background(), tt.isin)
|
got, gotErr := of.SecurityTypeByISIN(context.Background(), tt.isin)
|
||||||
if gotErr != nil {
|
if gotErr != nil {
|
||||||
@@ -145,7 +145,7 @@ func TestOpenFIGI_SecurityTypeByISIN_Cache(t *testing.T) {
|
|||||||
}, nil
|
}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
of := internal.NewOpenFIGI(c)
|
of := internal.NewOpenFIGI(c, "")
|
||||||
|
|
||||||
got, gotErr := of.SecurityTypeByISIN(t.Context(), "NL0000235190")
|
got, gotErr := of.SecurityTypeByISIN(t.Context(), "NL0000235190")
|
||||||
if gotErr != nil {
|
if gotErr != nil {
|
||||||
@@ -166,6 +166,55 @@ func TestOpenFIGI_SecurityTypeByISIN_Cache(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOpenFIGI_SecurityTypeByISIN_APIKey(t *testing.T) {
|
||||||
|
t.Run("with API key", func(t *testing.T) {
|
||||||
|
wantAPIKey := "123abc-456xyz"
|
||||||
|
|
||||||
|
c := NewTestClient(t, func(req *http.Request) (*http.Response, error) {
|
||||||
|
value, ok := req.Header[internal.OpenFIGIAPIKeyHeader]
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("want %q header but got none: %v", internal.OpenFIGIAPIKeyHeader, req.Header)
|
||||||
|
}
|
||||||
|
if len(value) != 1 {
|
||||||
|
t.Fatalf("want exactly one %q header value but got %d", internal.OpenFIGIAPIKeyHeader, len(value))
|
||||||
|
}
|
||||||
|
if value[0] != wantAPIKey {
|
||||||
|
t.Fatalf("want %q header value %q but got %q", internal.OpenFIGIAPIKeyHeader, wantAPIKey, value[0])
|
||||||
|
}
|
||||||
|
return &http.Response{
|
||||||
|
Status: http.StatusText(http.StatusOK),
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Body: io.NopCloser(bytes.NewBufferString(`[{"data":[{"securityType":"Common Stock"}]}]`)),
|
||||||
|
}, nil
|
||||||
|
})
|
||||||
|
of := internal.NewOpenFIGI(c, wantAPIKey)
|
||||||
|
|
||||||
|
_, err := of.SecurityTypeByISIN(t.Context(), "US1234567890")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("want success but got an error: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("without API key", func(t *testing.T) {
|
||||||
|
c := NewTestClient(t, func(req *http.Request) (*http.Response, error) {
|
||||||
|
_, ok := req.Header[internal.OpenFIGIAPIKeyHeader]
|
||||||
|
if ok {
|
||||||
|
t.Fatalf("want no %s header but got one", internal.OpenFIGIAPIKeyHeader)
|
||||||
|
}
|
||||||
|
return &http.Response{
|
||||||
|
Status: http.StatusText(http.StatusOK),
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Body: io.NopCloser(bytes.NewBufferString(`[{"data":[{"securityType":"Common Stock"}]}]`)),
|
||||||
|
}, nil
|
||||||
|
})
|
||||||
|
of := internal.NewOpenFIGI(c, "")
|
||||||
|
_, err := of.SecurityTypeByISIN(t.Context(), "US1234567890")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("want success but got an error: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
type RoundTripFunc func(req *http.Request) (*http.Response, error)
|
type RoundTripFunc func(req *http.Request) (*http.Response, error)
|
||||||
|
|
||||||
func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
|
func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log/slog"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/shopspring/decimal"
|
"github.com/shopspring/decimal"
|
||||||
@@ -54,14 +55,25 @@ type ReportWriter interface {
|
|||||||
type Selector func(Record) bool
|
type Selector func(Record) bool
|
||||||
|
|
||||||
// BuildReport reads records from a RecordReader and, if the record passes the Selector, it is
|
// BuildReport reads records from a RecordReader and, if the record passes the Selector, it is
|
||||||
// processed into the report
|
// processed into the ReportWriter.
|
||||||
func BuildReport(ctx context.Context, reader RecordReader, writer ReportWriter, s Selector) error {
|
func BuildReport(ctx context.Context, reader RecordReader, writer ReportWriter, sel Selector) error {
|
||||||
buys := make(map[string]*FillerQueue)
|
buys := make(map[string]*FillerQueue)
|
||||||
|
|
||||||
|
var buysCount, sellsCount int64
|
||||||
|
var lastTimestamp time.Time
|
||||||
|
progTicker := time.NewTicker(10 * time.Second)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
|
case <-progTicker.C:
|
||||||
|
slog.InfoContext(ctx, "Progress update",
|
||||||
|
slog.Int64("total_records", buysCount+sellsCount),
|
||||||
|
slog.Int64("sell_records", sellsCount),
|
||||||
|
slog.Int64("buy_records", buysCount),
|
||||||
|
slog.Time("last_record_timestamp", lastTimestamp),
|
||||||
|
)
|
||||||
default:
|
default:
|
||||||
rec, err := reader.ReadRecord(ctx)
|
rec, err := reader.ReadRecord(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -71,30 +83,52 @@ func BuildReport(ctx context.Context, reader RecordReader, writer ReportWriter,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s(rec) {
|
if rec.Side().IsBuy() {
|
||||||
continue
|
buysCount++
|
||||||
|
} else {
|
||||||
|
sellsCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lastTimestamp = rec.Timestamp()
|
||||||
|
|
||||||
buyQueue, ok := buys[rec.Symbol()]
|
buyQueue, ok := buys[rec.Symbol()]
|
||||||
if !ok {
|
if !ok {
|
||||||
buyQueue = new(FillerQueue)
|
buyQueue = new(FillerQueue)
|
||||||
buys[rec.Symbol()] = buyQueue
|
buys[rec.Symbol()] = buyQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = processRecord(ctx, buyQueue, rec, writer)
|
err = processRecord(ctx, buyQueue, rec, sel, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("processing record: %w", err)
|
return fmt.Errorf("processing record: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func processRecord(ctx context.Context, q *FillerQueue, rec Record, writer ReportWriter) error {
|
// processRecord either adds buys to the queue or consumes buys from the queue when processing a
|
||||||
|
// sell record.
|
||||||
|
// Selectors are only applied for sells for performance reasons. It's much cheaper to just accumulate
|
||||||
|
// buys and only actually inspect a record once a sell happens
|
||||||
|
func processRecord(ctx context.Context, q *FillerQueue, rec Record, sel Selector, writer ReportWriter) error {
|
||||||
|
slog.Debug("Report: processing record",
|
||||||
|
slog.String("symbol", rec.Symbol()),
|
||||||
|
slog.String("side", rec.Side().String()),
|
||||||
|
)
|
||||||
|
|
||||||
switch rec.Side() {
|
switch rec.Side() {
|
||||||
case SideBuy:
|
case SideBuy:
|
||||||
q.Push(NewFiller(rec))
|
q.Push(NewFiller(rec))
|
||||||
|
|
||||||
case SideSell:
|
case SideSell:
|
||||||
|
if !sel(rec) {
|
||||||
|
slog.Debug("Report: skipping record",
|
||||||
|
slog.String("symbol", rec.Symbol()),
|
||||||
|
slog.String("side", rec.Side().String()),
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
unmatchedQty := rec.Quantity()
|
unmatchedQty := rec.Quantity()
|
||||||
|
|
||||||
for unmatchedQty.IsPositive() {
|
for unmatchedQty.IsPositive() {
|
||||||
|
|||||||
@@ -158,7 +158,7 @@ func figiNatureGetter(ctx context.Context, of *internal.OpenFIGI, isin string) f
|
|||||||
}
|
}
|
||||||
|
|
||||||
switch secType {
|
switch secType {
|
||||||
case "Common Stock":
|
case "Common Stock", "ADR", "REIT":
|
||||||
return internal.NatureG01
|
return internal.NatureG01
|
||||||
case "ETP":
|
case "ETP":
|
||||||
return internal.NatureG20
|
return internal.NatureG20
|
||||||
|
|||||||
@@ -223,7 +223,7 @@ func NewFigiClientSecurityTypeStub(t testing.TB, securityType string) *internal.
|
|||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
return internal.NewOpenFIGI(c)
|
return internal.NewOpenFIGI(c, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFigiClientErrorStub(t testing.TB, err error) *internal.OpenFIGI {
|
func NewFigiClientErrorStub(t testing.TB, err error) *internal.OpenFIGI {
|
||||||
@@ -236,5 +236,5 @@ func NewFigiClientErrorStub(t testing.TB, err error) *internal.OpenFIGI {
|
|||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
return internal.NewOpenFIGI(c)
|
return internal.NewOpenFIGI(c, "")
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user