EP11 intermediate

Building a Data Source Middleware for Financial Analysis

A 5-layer architecture that unifies scattered financial data providers behind one interface — with fallbacks, rate limits, and anti-blocking built in.

Financial data is a mess. Not the data itself — the infrastructure around it. You need real-time quotes from one provider, fundamental data from another, news from three more, and research reports from whoever will sell them to you. Each provider has its own API format, authentication scheme, rate limits, and failure modes.

I’ve connected to over a dozen data sources in the past year. The pattern is always the same: write an adapter, handle auth, manage rate limits, add caching, write fallback logic when the primary goes down at 2am. Every time, from scratch.

The middleware approach solves this once.

The 5-Layer Architecture

Instead of scattering data calls throughout your codebase, you stack them into layers. Each layer handles one category of financial data, with multiple providers behind it.

┌─────────────────────────────────────────────┐
│            Your Application / Agent          │
├─────────────────────────────────────────────┤
│              Unified Data API                │
├──────┬──────┬──────┬──────┬────────────────┤
│  L1  │  L2  │  L3  │  L4  │      L5        │
│Quote │Report│ News │ Fund │  Announce      │
├──────┴──────┴──────┴──────┴────────────────┤
│         Cache + Rate Limiter                │
├─────────────────────────────────────────────┤
│       Provider Adapters (pluggable)         │
└─────────────────────────────────────────────┘

Layer 1: Real-Time Quotes

Market data at the speed you need it. This layer supports two modes:

  • REST polling: Hit an endpoint every N seconds. Simple, works everywhere, but you’re limited by rate caps. Good enough for swing trading or daily analysis.
  • WebSocket streaming: Persistent connection, sub-second updates. Necessary for intraday strategies. More complex to manage (reconnection logic, heartbeats, message buffering).

The key provider criteria: latency, reliability, and whether they block your IP after heavy usage.

Layer 2: Research Reports

Aggregated analyst reports, earnings estimates, price targets. These come from specialized providers and are often the most expensive data category.

The middleware normalizes report formats. Provider A gives you a PDF link with metadata in XML. Provider B gives you structured JSON. Your application sees one consistent ResearchReport object regardless of source.

Layer 3: News and Sentiment

RSS feeds, API aggregation, and increasingly, pre-computed sentiment scores. Three sub-categories:

  • Wire services: Reuters, AP, Bloomberg — fastest but most expensive
  • Aggregators: News API, financial news feeds — 5-30 minute delay, much cheaper
  • Social/alternative: Reddit, StockTwits, Twitter — noisy but occasionally predictive

I keep all three tiers active. Wire services for time-sensitive signals, aggregators for daily summaries, social for sentiment outliers.

Layer 4: Fundamentals

Financial statements, ratios, filings. The most stable layer — this data changes quarterly, not by the second.

  • Income statements, balance sheets, cash flow
  • Valuation ratios (P/E, P/B, EV/EBITDA)
  • SEC/regulatory filings (10-K, 10-Q, 8-K)

Caching is aggressive here. No reason to fetch the same quarterly report twice.

Layer 5: Announcements

Corporate actions, dividend declarations, stock splits, insider transactions, regulatory filings. This data is event-driven — you need it the moment it’s published, but it doesn’t change after that.

A webhook or polling system monitors announcement feeds and pushes events to your application.

Design Principles

Multi-Source Redundancy

Every layer has at least two providers. If the primary goes down, the fallback kicks in automatically. No manual intervention, no alerts at 3am.

class QuoteLayer:
    def __init__(self):
        self.providers = [
            PrimaryQuoteProvider(api_key=os.environ["PRIMARY_KEY"]),
            FallbackQuoteProvider(api_key=os.environ["FALLBACK_KEY"]),
        ]

    def get_quote(self, symbol: str) -> Quote:
        for provider in self.providers:
            try:
                return provider.fetch(symbol)
            except (Timeout, RateLimitExceeded, ServiceUnavailable):
                continue
        raise AllProvidersFailedError(symbol)

The error types matter. A 400 Bad Request means your query is wrong — don’t retry on the next provider. A 429 Too Many Requests or 503 means the provider is temporarily unavailable — try the next one.

Rate Limit Management

Each provider has different limits. Some cap by requests-per-second, others by requests-per-day, others by concurrent connections.

class RateLimiter:
    def __init__(self, max_per_second: int, max_per_day: int):
        self.second_limiter = TokenBucket(max_per_second, refill_rate=max_per_second)
        self.day_limiter = TokenBucket(max_per_day, refill_rate=max_per_day / 86400)

    def acquire(self) -> bool:
        return self.second_limiter.consume(1) and self.day_limiter.consume(1)

Track usage per provider. When Provider A is near its daily cap, route new requests to Provider B preemptively — don’t wait for the 429.

Caching Strategy

Not all data deserves the same cache TTL:

Data TypeCache TTLReason
Real-time quotes1-5 secondsStale prices kill strategies
News articles30 minutesUnlikely to change after publish
Research reports24 hoursUpdated daily at most
Fundamentals7 daysQuarterly data
Historical pricesForeverYesterday’s close doesn’t change

Redis works well here. Fast reads, TTL support built in, and you can run it alongside your application on the same machine.

Anti-Blocking Strategies

This is the part nobody talks about in tutorials. Aggressive API usage gets you blocked. Here’s how to avoid it.

TCP protocol over HTTP. Some data providers offer TCP socket connections alongside their REST APIs. TCP connections don’t carry HTTP headers, user-agent strings, or cookies — so the provider’s anti-bot systems have much less to fingerprint. No HTTP means no IP blocking based on request patterns. If a provider offers a TCP feed, use it.

Request throttling with jitter. Don’t fire requests at exact intervals. A request every 1.000 seconds looks like a bot. A request every 0.8-1.3 seconds (random jitter) looks like a human.

import random
import time

def throttled_request(func, min_delay=0.8, max_delay=1.3):
    result = func()
    time.sleep(random.uniform(min_delay, max_delay))
    return result

Rotating user agents. If you must use HTTP, rotate user-agent strings across a pool of real browser signatures. Update the pool quarterly — old user agents are a dead giveaway.

IP rotation for non-authenticated endpoints. Some free data sources don’t offer API keys, just public endpoints with IP-based rate limits. A rotating proxy pool (residential, not datacenter) can distribute load. But be honest about what you’re doing — if a provider’s ToS prohibits scraping, don’t scrape.

Evaluating Data Providers

Before integrating a new provider, score it on five dimensions:

CriterionWeightWhat to Check
Reliability30%Historical uptime, status page, incident reports
Speed20%Median latency, p99 latency, geographic proximity
Completeness20%Market coverage, history depth, corporate actions
Cost20%Per-call pricing, monthly caps, overage fees
Anti-blocking risk10%ToS clarity, rate limit transparency, ban history

Run a 2-week trial with real production load before committing. Weekend and holiday behavior matters — some providers have reduced capacity outside trading hours.

Integration as a Claude Code Skill

The entire middleware can be packaged as a single-file Claude Code skill. Your AI agent calls one function — get_data(symbol, data_type) — and the middleware handles provider selection, caching, rate limits, and fallbacks invisibly.

# skill: market-data
# description: Unified financial data access

def get_data(symbol: str, data_type: str, **kwargs) -> dict:
    """
    data_type: "quote" | "fundamental" | "news" | "report" | "announcement"
    Returns normalized data regardless of underlying provider.
    """
    layer = LAYERS[data_type]
    cache_key = f"{data_type}:{symbol}:{hash(frozenset(kwargs.items()))}"

    cached = cache.get(cache_key)
    if cached:
        return cached

    result = layer.fetch(symbol, **kwargs)
    cache.set(cache_key, result, ttl=TTL_MAP[data_type])
    return result

The agent doesn’t need to know which provider served the data. It doesn’t need to handle retries or rate limits. It just asks for data and gets it.

The Payoff

Without middleware, adding a new data source means updating every file that touches financial data. With middleware, you write one adapter, register it in the provider list, and everything else keeps working.

I’ve added six providers to my middleware over the past year. Average integration time: about 2 hours per provider. The first time I built it took a full weekend. Every provider after that was just a new adapter class conforming to the existing interface.

Start with two layers (quotes and fundamentals) and two providers per layer. Expand when you need to. The architecture scales — you don’t have to build all five layers on day one.