Real-Time Head-and-Shoulders Pattern Detection for AI Trading Strategies

Every trader knows the Head-and-Shoulders pattern. Few can detect it in real-time without looking at charts. In this article, I’ll show you how a simple CNN can identify this reversal pattern with 97% accuracy — and more importantly, how to use it as a risk control signal rather than a trading signal.

I recently co-authored the book Hands-On AI Trading with Python, QuantConnect, and AWS (Wiley, 2025). It’s a hands-on guide bridging AI and quantitative trading, with fully implemented strategies you can run, audit, and extend — the code is available in the book’s GitHub repository.

Book cover titled Hands-On AI Trading with authors Jiri Pik, Ernest P. Chan, Jared Broad, Philip Sun, and Vivek Singh. Subtitle mentions Python, QuantConnect, and AWS. Background features circuit-like lines. Publisher: Wiley.

In this article, I’ll introduce the breadth of strategies in the book and then go deep on one specific example: a CNN-based chart pattern recognition model for the Head-and-Shoulders reversal pattern. The goal is to take the “offline demo” version and upgrade it into a live, streaming inference pipeline that acts as a tactical overlay for portfolio allocation.

What’s in the Book: A Catalog of AI Trading Strategies

The book is intentionally practical designed to teach intuition:

  • complete code available
  • all strategies presented in end-to-end pipelines (data → features → model → backtest → trading logic), and
  • all strategies apply modern techniques spanning classic ML, deep learning, time-series, and NLP.

Here’s the complete list of strategy examples and the primary method(s) used in each:

A table compares strategy examples, such as ML Trend Scanning and Factor Preprocessing, with their core AI methods, including semi-supervised labeling, PCA, SVM regression, HMMs, decision trees, and neural networks.

Why Head-and-Shoulders + CNN Is Worth Your Time

A line graph showing a head and shoulders chart pattern, with the highest peak labeled Head, two smaller peaks on each side labeled Shoulder, and a dotted line labeled Neckline running below the valleys.

The Head-and-Shoulders top is a classic reversal shape:

  • left shoulder (local peak)
  • head (higher peak)
  • right shoulder (lower peak)
  • neckline area (support that often breaks during reversal)

The problem in systematic trading is that hand-coded pattern rules are brittle:

  • too many thresholds
  • too many edge cases
  • poor portability across assets/timeframes.

A CNN can learn shape invariances directly from the data: relative peak geometry, local slope, symmetry, and “neckline-ish” structure.

In the book, the model is trained primarily on synthetic examples (to get enough labeled positives) and then deployed as a classifier.

This article upgrades that into something closer to production:

  1. Streaming OHLC feature windowing
  2. Real-time inference
  3. A tactical allocation overlay that scales risk up/down

Design: CNN as a Tactical Overlay (Not a Standalone Strategy)

In my experience, pattern detectors are most useful when they modify an existing alpha, rather than trying to trade the detector directly.

Examples of overlay behaviors:

  • Suppress long exposure when a bearish head-and-shoulders probability is high
  • Downweight risk (position sizing multiplier) while a reversal formation is likely
  • Require stricter entry criteria for your primary strategy when pattern probability exceeds a threshold
  • Conditional hedging: add a hedge sleeve if probability stays high for multiple bars (persistence)

In other words: the CNN outputs a probability, and we convert it into a risk control signal.

A Practical Overlay Rule

Let:

  • p_hs = model probability of bearish head-and-shoulders
  • w_base = your baseline allocation (from your main strategy)

Define a gating multiplier:

  • gate = clip(1 - p_hs, 0.0, 1.0)

Then the final allocation becomes:

  • w_final = w_base * gate

Interpretation:

  • if p_hs = 0.0, no suppression (gate = 1.0)
  • if p_hs = 0.8, your position is cut by 80%
  • if p_hs = 1.0, you go flat (or near-flat)

You can also add persistence:

  • only apply suppression if p_hs > 0.8 for k consecutive bars

Complete Code: Train + Stream Inference + Overlay

This is a self-contained PyTorch implementation that includes:

  • synthetic data generation (pattern vs non-pattern)
  • CNN architecture
  • training loop
  • streaming inference loop
  • overlay sizing rule
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt


# -----------------------------
# 1) Synthetic data generation
# -----------------------------
def gen_head_and_shoulders(window=64, noise_std=0.01, amp=0.08, seed=None):
    """
    Generate a synthetic Head-and-Shoulders top pattern.
    Output: normalized 1D price series length=window.
    """
    if seed is not None:
        np.random.seed(seed)

    x = np.linspace(0, 1, window)

    # Define three peaks: left shoulder, head, right shoulder
    # Use Gaussian bumps for smoothness
    def bump(center, width, height):
        return height * np.exp(-0.5 * ((x - center) / width) ** 2)

    baseline = 1.0 + 0.02 * np.sin(2 * np.pi * x)  # mild waviness
    left = bump(center=0.30, width=0.06, height=amp * 0.7)
    head = bump(center=0.50, width=0.07, height=amp * 1.0)
    right = bump(center=0.70, width=0.06, height=amp * 0.7)

    # Add a gentle down drift after the head (to hint reversal)
    drift = -0.03 * np.clip(x - 0.55, 0, 1)

    y = baseline + left + head + right + drift
    y += np.random.normal(0, noise_std, size=window)

    # Normalize to start at 1.0 (scale invariance)
    y = y / y[0]
    return y.astype(np.float32)


def gen_non_pattern(window=64, noise_std=0.01, seed=None):
    """
    Generate a synthetic non-pattern series (random walk + drift + noise).
    """
    if seed is not None:
        np.random.seed(seed)

    steps = np.random.normal(0, noise_std, size=window)
    drift = np.linspace(0, np.random.uniform(-0.01, 0.01), window)
    y = 1.0 + np.cumsum(steps) + drift
    y = y / y[0]
    return y.astype(np.float32)


def build_dataset(n=5000, window=64, pos_frac=0.5):
    n_pos = int(n * pos_frac)
    n_neg = n - n_pos

    X = []
    y = []

    for i in range(n_pos):
        X.append(gen_head_and_shoulders(window=window, seed=i))
        y.append(1)

    for i in range(n_neg):
        X.append(gen_non_pattern(window=window, seed=10_000 + i))
        y.append(0)

    X = np.stack(X, axis=0)  # (N, window)
    y = np.array(y, dtype=np.float32)  # (N,)
    # Shuffle
    idx = np.random.permutation(len(y))
    return X[idx], y[idx]


# -----------------------------
# 2) CNN model
# -----------------------------
class HSCNN(nn.Module):
    def __init__(self, window=64):
        super().__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=7, padding=3)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=5, padding=2)
        self.pool = nn.MaxPool1d(kernel_size=2)

        # After 2 pools, length halves twice: window -> window/2 -> window/4
        flat_len = 32 * (window // 4)
        self.fc1 = nn.Linear(flat_len, 64)
        self.fc2 = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = self.pool(x)
        x = torch.relu(self.conv2(x))
        x = self.pool(x)

        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x


# -----------------------------
# 3) Train
# -----------------------------
def train_model(window=64, epochs=8, batch_size=128, lr=1e-3, device="cpu"):
    X, y = build_dataset(n=6000, window=window, pos_frac=0.5)

    # Train/test split
    split = int(0.85 * len(y))
    X_train, X_test = X[:split], X[split:]
    y_train, y_test = y[:split], y[split:]

    X_train = torch.tensor(X_train).view(-1, 1, window).to(device)
    y_train = torch.tensor(y_train).view(-1, 1).to(device)

    X_test = torch.tensor(X_test).view(-1, 1, window).to(device)
    y_test = torch.tensor(y_test).view(-1, 1).to(device)

    model = HSCNN(window=window).to(device)
    opt = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.BCELoss()

    for ep in range(1, epochs + 1):
        model.train()
        perm = torch.randperm(X_train.size(0))
        X_train_shuf = X_train[perm]
        y_train_shuf = y_train[perm]

        for i in range(0, X_train.size(0), batch_size):
            xb = X_train_shuf[i : i + batch_size]
            yb = y_train_shuf[i : i + batch_size]

            opt.zero_grad()
            pred = model(xb)
            loss = loss_fn(pred, yb)
            loss.backward()
            opt.step()

        # Evaluate
        model.eval()
        with torch.no_grad():
            pred = model(X_test)
            acc = ((pred > 0.5) == (y_test > 0.5)).float().mean().item()
            print(f"epoch={ep} loss={loss.item():.4f} test_acc={acc:.4f}")

    return model


# -----------------------------
# 4) Streaming inference + overlay
# -----------------------------
def overlay_gate(p_hs, lo=0.0, hi=1.0):
    """
    Simple gating rule: reduce exposure as pattern probability rises.
    """
    gate = 1.0 - float(p_hs)
    return max(lo, min(hi, gate))


def live_price_stream():
    """
    Stub generator. Replace with real bars.
    Here: generate a time series where a H&S pattern occasionally appears.
    """
    # start with random walk
    p = 1.0
    for t in range(100):
        p *= 1.0 + np.random.normal(0, 0.002)
        yield float(p)

    # insert a synthetic head-and-shoulders formation using SAME parameters as training
    # Use window=64 and same amp/noise as training for reliable detection
    hs = gen_head_and_shoulders(window=64, noise_std=0.01, amp=0.08)
    base = p
    for v in hs:
        yield float(base * v)

    # short gap then another H&S pattern
    p = base * hs[-1]
    for t in range(50):
        p *= 1.0 + np.random.normal(0, 0.002)
        yield float(p)

    # second H&S pattern
    hs2 = gen_head_and_shoulders(window=64, noise_std=0.01, amp=0.08, seed=999)
    base = p
    for v in hs2:
        yield float(base * v)

    # continue random walk
    p = base * hs2[-1]
    for t in range(100):
        p *= 1.0 + np.random.normal(0, 0.002)
        yield float(p)


def run_live(model, window=64, threshold=0.60, device="cpu"):
    """
    Run live inference with lower threshold and immediate gating for demonstration.
    Returns data for visualization.
    """
    model.eval()

    buf = []
    base_allocation = 1.0  # imagine your main strategy wants to be 100% long

    # optional persistence: require consecutive hits
    consecutive = 0
    need_k = 2  # reduced from 3 for faster response

    bar_num = 0
    detected_count = 0

    # Collect data for visualization
    prices = []
    probabilities = []
    allocations = []
    detection_bars = []

    for px in live_price_stream():
        buf.append(px)
        bar_num += 1
        prices.append(px)

        if len(buf) < window:
            probabilities.append(0.0)
            allocations.append(1.0)
            continue

        w = np.array(buf[-window:], dtype=np.float32)
        w = w / w[0]  # normalize

        inp = torch.tensor(w).view(1, 1, window).to(device)

        with torch.no_grad():
            p_hs = model(inp).item()

        probabilities.append(p_hs)

        if p_hs >= threshold:
            consecutive += 1
        else:
            consecutive = 0

        # Apply overlay if pattern detected (even on first detection for visibility)
        if p_hs >= threshold:
            gate = overlay_gate(p_hs)
            detected_count += 1
            detection_bars.append(bar_num - 1)  # 0-indexed for plotting
        else:
            gate = 1.0

        allocations.append(gate)
        final_allocation = base_allocation * gate

        # Only print when something interesting happens (pattern detected or every 20 bars)
        if p_hs >= threshold or bar_num % 50 == 0:
            status = "*** H&S DETECTED ***" if p_hs >= threshold else ""
            print(
                f"bar={bar_num:4d} price={px:.4f} p_hs={p_hs:.3f} consecutive={consecutive} "
                f"gate={gate:.2f} alloc={final_allocation:.2f} {status}"
            )

    print(
        f"\n=== Summary: {detected_count} bars with H&S pattern detection (p >= {threshold}) ==="
    )

    return {
        "prices": np.array(prices),
        "probabilities": np.array(probabilities),
        "allocations": np.array(allocations),
        "detection_bars": detection_bars,
        "threshold": threshold,
    }


def plot_detection_results(results, save_path=None):
    """
    Visualize price series with H&S pattern detections highlighted.
    """
    prices = results["prices"]
    probs = results["probabilities"]
    allocs = results["allocations"]
    detections = results["detection_bars"]
    threshold = results["threshold"]

    fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)
    bars = np.arange(len(prices))

    # --- Panel 1: Price with detection highlighting ---
    ax1 = axes[0]
    ax1.plot(bars, prices, "b-", linewidth=1, label="Price")

    # Highlight detection regions
    if detections:
        detection_prices = prices[detections]
        ax1.scatter(
            detections,
            detection_prices,
            c="red",
            s=20,
            alpha=0.7,
            label=f"H&S Detected (p ≥ {threshold})",
            zorder=5,
        )

    ax1.set_ylabel("Price", fontsize=11)
    ax1.set_title(
        "Real-Time Head-and-Shoulders Detection with CNN Overlay",
        fontsize=14,
        fontweight="bold",
    )
    ax1.legend(loc="upper left")
    ax1.grid(True, alpha=0.3)

    # --- Panel 2: Detection probability ---
    ax2 = axes[1]
    ax2.fill_between(bars, 0, probs, alpha=0.4, color="orange", label="H&S Probability")
    ax2.axhline(
        y=threshold,
        color="red",
        linestyle="--",
        linewidth=1.5,
        label=f"Threshold ({threshold})",
    )
    ax2.set_ylabel("P(H&S)", fontsize=11)
    ax2.set_ylim(0, 1)
    ax2.legend(loc="upper left")
    ax2.grid(True, alpha=0.3)

    # --- Panel 3: Allocation gate ---
    ax3 = axes[2]
    ax3.fill_between(bars, 0, allocs, alpha=0.5, color="green", label="Allocation Gate")
    ax3.set_ylabel("Allocation", fontsize=11)
    ax3.set_xlabel("Bar", fontsize=11)
    ax3.set_ylim(0, 1.1)
    ax3.legend(loc="lower left")
    ax3.grid(True, alpha=0.3)

    # Add annotations for key events
    ax3.annotate("Full exposure\n(no pattern)", xy=(50, 1.0), fontsize=9, ha="center")
    if detections:
        mid_detect = detections[len(detections) // 2]
        ax3.annotate(
            "Reduced exposure\n(pattern detected)",
            xy=(mid_detect, allocs[mid_detect]),
            xytext=(mid_detect + 20, 0.5),
            fontsize=9,
            ha="left",
            arrowprops=dict(arrowstyle="->", color="black", lw=0.8),
        )

    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches="tight")
        print(f"\nChart saved to: {save_path}")

    plt.show()
    return fig


if __name__ == "__main__":
    device = "cpu"
    window = 64
    print("Training Head & Shoulders CNN detector...")
    model = train_model(
        window=window, epochs=8, device=device
    )  # more epochs for better accuracy
    print("\n--- Running live simulation with H&S pattern injection ---\n")
    results = run_live(model, window=window, threshold=0.60, device=device)

    # Generate visualization
    print("\nGenerating visualization...")
    plot_detection_results(results, save_path="hs_detection_chart.png")

Sample output:

A three-panel chart shows a stock price with detected head-and-shoulders patterns, their probability over a threshold, and corresponding allocation changes that reduce exposure during detected patterns.
bar= 159 price=0.9570 p_hs=0.687 gate=0.31 alloc=0.31 *** H&S DETECTED ***
bar= 160 price=0.9469 p_hs=0.781 gate=0.22 alloc=0.22 *** H&S DETECTED ***
bar= 163 price=0.9685 p_hs=0.932 gate=0.07 alloc=0.07 *** H&S DETECTED ***

How to Use This in a Trading System

Implementation pattern:

  1. Train offline (research environment, CI job, or scheduled retrain)
  2. Save weights (e.g., state_dict) to your deployment environment
  3. Load model at strategy initialization
  4. Maintain a rolling window of bars (close or engineered features)
  5. Run inference on each bar close
  6. Convert probability into an overlay gate (risk multiplier)
  7. Apply gate to:
  • position sizing
  • trade entry filters
  • hedging triggers

Practical Tips That Matter in Production

  • Use probability, not a hard yes/no signal. A probability is a richer control knob.
  • Add persistence. Require k consecutive bars above threshold to reduce noise.
  • Normalize consistently. If training used normalization-by-first-value, do the same live.
  • Track false positives. Plot detections over time and spot when you’re suppressing good trades.
  • Retrain intentionally. Synthetic data helps, but augment with real market examples when available.

Key Takeaways:

  • CNNs can learn chart patterns without hand-coded rules
  • Use pattern probability as a risk multiplier, not a binary signal
  • Require persistence (k consecutive detections) to reduce noise
  • Train on synthetic data, validate on real market data

Closing

A CNN pattern detector becomes genuinely useful when you treat it as a risk-aware overlay rather than a standalone “pattern trading strategy.” The payoff is usually not raw return, but better drawdown behavior and fewer catastrophic entries during structural reversals.

If you want to explore the full set of AI trading strategies from the book, the code is in: https://github.com/QuantConnect/HandsOnAITradingBook

And the book is here: https://amzn.to/4fI0hkU

Author

keyboard_arrow_up
Index