14. Timeseries Anomaly Explanation with LLMs#

14.1. Overview#

This notebook demonstrates a pipeline for identifying, contextualizing, and explaining anomalies in timeseries indicator data using Large Language Models (LLMs). The approach is generalized to work with any timeseries dataset (World Development Indicators, Corporate Scorecard, or custom indicators)—not just a single application.

14.1.1. Purpose#

Automatically generate verifiable explanations for significant data deviations, classifying them as:

  • External drivers — macroeconomic events, conflicts, policy reforms, disasters

  • Data errors — placeholders, ingestion issues, computation artifacts

  • Measurement system updates — rebasing, census revisions, classification changes

  • Insufficient data — when no verifiable cause can be identified

14.1.2. Learning Objectives#

  1. The canonical data interface for timeseries anomaly inputs

  2. How to adapt legacy formats (e.g., Scorecard wide format) to the canonical schema

  3. The context extraction logic for building LLM prompts

  4. How structured prompts and JSON schema constrain LLM outputs

  5. How to parse and analyze batch LLM outputs

14.1.3. Prerequisites#

  • Python 3.11+ with ai4data[anomaly] and ai4data[metadata] installed

  • API keys for OpenAI and/or Gemini (for LLM inference)

14.2. Pipeline Overview#

flowchart LR
    subgraph input [Input]
        A[Wide CSV]
        B[Anomaly Scores CSV]
    end
    subgraph adapt [Transform]
        C[Adapter]
    end
    subgraph process [Pipeline]
        D[Canonical Format]
        E[Anomaly Ranking]
        F[Context Extraction]
        G[LLM Prompts]
        H[Batch API]
        I[Parsed Output]
    end
    A --> C
    B --> C
    C --> D
    D --> E
    E --> F
    F --> G
    G --> H
    H --> I

14.3. Step 1: Understand the Data Interface#

The pipeline consumes data in a canonical long-format schema. This format is designed to be:

  • Indicator-agnostic — works for WDI, Scorecard, or custom indicators

  • Geography-agnostic — countries, regions, or any spatial unit

  • Explicit about anomaly metadata — pre-computed scores and imputation flags

14.3.1. Canonical Column Semantics#

Column

Type

Description

indicator_id

str

Unique indicator code (e.g., WB_CSC_SI_POV_UMIC)

indicator_name

str

Human-readable indicator label

geography_id

str

Geography code (e.g., ISL for Iceland)

geography_name

str

Human-readable geography label

period

int

Time period (year for annual data)

value

float

Indicator value (NaN allowed)

is_imputed

bool

Whether the value was imputed

anomaly_score

float

Pre-computed anomaly magnitude (e.g., |z-score|)

outlier_count

int

Number of detectors that flagged this point

14.4. Step 2: Load and Adapt Input Data#

If your data is in a legacy format (e.g., Scorecard wide format with separate anomaly scores), use the adapter to convert to canonical format.

import json
import os
from pathlib import Path

import pandas as pd
from jinja2 import Template

from ai4data.anomaly.explanation import (
    adapter_from_config,
    build_batch_file,
    extract_anomaly_contexts,
    parse_batch_output,
    run_batch,
)
from ai4data.anomaly.explanation.prompts import (
    USER_PROMPT_TEMPLATE,
    get_anomaly_response_format,
)
# Configuration: adjust paths to your data
# llm-input: processed batch requests (upload this to OpenAI/Gemini Batch API)
# llm-output: responses from the batch API after completion
os.environ["ANOMALY_DATA_DIR"] = "../../data/anomaly/CSC"
os.environ["ANOMALY_LLM_INPUT_DIR"] = "../../data/anomaly/CSC/llm-input"
os.environ["ANOMALY_LLM_OUTPUT_DIR"] = "../../data/anomaly/CSC/llm-output"

DATA_DIR = Path(os.environ.get("ANOMALY_DATA_DIR", "."))
INPUT_DIR = Path(os.environ.get("ANOMALY_LLM_INPUT_DIR", DATA_DIR / "llm-input"))
OUTPUT_DIR = Path(os.environ.get("ANOMALY_LLM_OUTPUT_DIR", DATA_DIR / "llm-output"))

# WIDE_PATH = DATA_DIR / "WB_CSC_WIDEF.csv"
# ANOMALY_PATH = DATA_DIR / "CSC_TOP_ANOMALIES_2026-02-06.CSV"
# if WIDE_PATH.exists() and ANOMALY_PATH.exists():
#     adapter = ScorecardWideAdapter()
#     canonical_df = adapter.load(WIDE_PATH, ANOMALY_PATH)
# else:
#     canonical_df = pd.DataFrame()
# canonical_df = pd.read_excel(DATA_DIR / "CSC_ANOMALIES_2026-02-18.xlsx")
# Create the mapping for the adapter to convert the input format to the canonical format
mapping = {
    "indicator_id": "INDICATOR",
    "indicator_name": "INDICATOR_LABEL",
    "geography_id": "REF_AREA",
    "geography_name": "REF_AREA_LABEL",
    "period": "YEAR",
    "value": "VALUE",
    "is_imputed": "Imputed",
    "anomaly_score": "absZscore_zscore",  # or create from Zscore
    "outlier_count": "outlier_indicator_total",
}
adapt = adapter_from_config(mapping)
canonical_df = adapt["adapt_excel"](DATA_DIR / "CSC_ANOMALIES_2026-02-18.xlsx")

14.5. Step 3: Anomaly Identification#

We filter for series with multiple anomaly detectors in agreement (outlier_count >= 3) and rank by combined anomaly_score. This produces a shortlist of (indicator, geography) pairs to explain.

shortlist = pd.DataFrame()
max_count = 10_000
if len(canonical_df) > 0:
    # Filter: at least 3 detectors agree, exclude imputed
    filtered = canonical_df[
        (canonical_df["outlier_count"] >= 3) & (~canonical_df["is_imputed"])
    ]
    # Rank by sum of anomaly_score per (indicator, geography)
    ranked = (
        filtered.groupby(["indicator_id", "geography_id"])["anomaly_score"]
        .sum()
        .sort_values(ascending=False)
        .reset_index()
    )
    shortlist = ranked.head(max_count)  # Cap for batch processing
    print(f"Shortlist: {len(shortlist)} series")
    display(shortlist.head())
Shortlist: 870 series
indicator_id geography_id anomaly_score
0 WB_CSC_SI_POV_UMIC CAN 6.754442
1 WB_CSC_EG_ELC_ACCS_ZS SUR 6.725937
2 WB_CSC_EG_ELC_ACCS_ZS BFA 6.668304
3 WB_CSC_SI_POV_DDAY ESP 6.607846
4 WB_CSC_SI_POV_DDAY ITA 6.584522

14.6. Step 4: Context Generation#

For each (indicator, geography) in the shortlist, we extract a time-windowed context around the anomaly years. Overlapping windows are merged into contiguous ranges so the LLM sees a single coherent snippet per anomaly cluster.

if len(canonical_df) > 0 and len(shortlist) > 0:
    # Build name maps for context
    geo_map = canonical_df.set_index("geography_id")["geography_name"].to_dict()
    ind_map = canonical_df.set_index("indicator_id")["indicator_name"].to_dict()

    # Index by (indicator_id, geography_id) for fast lookup
    source_df = canonical_df.set_index(["indicator_id", "geography_id"]).sort_index()

    # Extract one example context
    row = shortlist.iloc[0]
    ind_id, geo_id = row["indicator_id"], row["geography_id"]
    series_df = source_df.loc[(ind_id, geo_id)].reset_index()
    contexts = extract_anomaly_contexts(
        series_df,
        geography_name_map=geo_map,
        indicator_name_map=ind_map,
        period_window=3,
        min_outlier_count=3,
    )
    print("Example context (first series):")
    print(json.dumps(contexts[0] if contexts else {}, indent=2))
Example context (first series):
{
  "Indicator": "Percentage of global population living in poverty (at $6.85/day)",
  "Country": "Canada",
  "Series": [
    {
      "YEAR": 2004,
      "VALUE": 1.0,
      "Imputed": false
    },
    {
      "YEAR": 2005,
      "VALUE": 1.0,
      "Imputed": false
    },
    {
      "YEAR": 2006,
      "VALUE": 1.0,
      "Imputed": false
    },
    {
      "YEAR": 2007,
      "VALUE": 0.0,
      "Imputed": false
    },
    {
      "YEAR": 2008,
      "VALUE": 1.0,
      "Imputed": false
    },
    {
      "YEAR": 2009,
      "VALUE": 1.0,
      "Imputed": false
    },
    {
      "YEAR": 2010,
      "VALUE": 0.0,
      "Imputed": false
    },
    {
      "YEAR": 2011,
      "VALUE": 1.0,
      "Imputed": false
    },
    {
      "YEAR": 2012,
      "VALUE": 1.0,
      "Imputed": false
    },
    {
      "YEAR": 2013,
      "VALUE": 1.0,
      "Imputed": false
    }
  ]
}

14.7. Step 5: LLM Prompting#

The prompt design instructs the LLM to:

  1. Treat anomalies as windows (start, end), not single points

  2. Classify into one of five categories

  3. Provide evidence strength and optional evidence sources

  4. Output strictly valid JSON matching the schema

user_template = Template(USER_PROMPT_TEMPLATE)
response_format = get_anomaly_response_format()

# Example: render prompt with a context
if contexts:
    sample_context = json.dumps(contexts[0], indent=2)
    sample_prompt = user_template.render(INPUT_SERIES_INFO=sample_context)
    print("Sample user prompt (first 1500 chars):")
    print(sample_prompt[:1500] + "...")
Sample user prompt (first 1500 chars):
# TASK
Validate the anomalies in the time series below, explain their most likely verifiable causes, and classify each anomaly window.

# ANALYSIS RULES
1. Treat anomalies as windows ([start, end]), not individual points; merge contiguous anomalous years.
2. Confirm anomalies only if they align with a verifiable event or clear data-quality issue.
3. The time series includes imputed values indicated by the "Imputed" column. Do not attempt to explain these values.
4. You may use general, well-documented historical knowledge (e.g., wars, natural disasters, global crises, pandemics, major policy reforms, or statistical revisions) when such events are clearly established in history and widely recognized.
5. If there is no match with known history or documented statistical events set is_anomaly=false and classification="insufficient_data".
6. Use one of these primary classifications:
   - "data_error" — placeholder, rounding, rebasing artifact, template issue, ingestion error, logical computation impossibility.
   - "external_driver" — macroeconomic or geopolitical event, conflict, policy reform, disaster, pandemic, global cycle.
   - "measurement_system_update" — rebasing, SNA/PPP revision, new census benchmark, classification change.
   - "modeling_artifact" — anomaly detector or transformation artifact.
   - "insufficient_data" — no verifiable cause.
7. Assign "evidence_strength" as one of:
   - "strong_direct" — clearly linked, well-documented event or revision.
   - "moderate_...

14.8. Step 6: LLM Inference#

  1. Build a JSONL file of batch requests (one per context) and save to llm-input/

  2. Upload that file to OpenAI Batch API or Gemini File API

  3. Poll for completion; download results and save to llm-output/

This step creates the batch requests and writes them to llm-input/. After you run the batch externally, responses will be in llm-output/.

shortlist.head(5)
indicator_id geography_id anomaly_score
0 WB_CSC_SI_POV_UMIC CAN 6.754442
1 WB_CSC_EG_ELC_ACCS_ZS SUR 6.725937
2 WB_CSC_EG_ELC_ACCS_ZS BFA 6.668304
3 WB_CSC_SI_POV_DDAY ESP 6.607846
4 WB_CSC_SI_POV_DDAY ITA 6.584522
from datetime import datetime, timezone

# Build batch JSONL and save to llm-input (upload this to the batch API)
INPUT_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

provider = "openai"  # or "gemini" for Gemini Batch API
batch_fname = INPUT_DIR / f"{provider}-anomaly-{int(datetime.now(tz=timezone.utc).timestamp())}.jsonl"

if len(shortlist) > 0:
    path, count = build_batch_file(
        batch_fname,
        shortlist.head(5),  # Limit to 5 for demo; remove .head(5) for full run
        source_df,
        geo_map,
        ind_map,
        provider=provider,
        model_id="gpt-4.1-mini" if provider == "openai" else "gemini-2.0-flash",
    )
    print(f"Wrote {count} requests to llm-input: {path}")
    print(f"Upload to {provider} batch API; save response to llm-output/{provider}-anomaly-output.jsonl")
Wrote 5 requests to llm-input: ../../data/anomaly/CSC/llm-input/openai-anomaly-1774297027.jsonl
Upload to openai batch API; save response to llm-output/openai-anomaly-output.jsonl
# Upload, run batch, and download results (requires OPENAI_API_KEY or GEMINI_API_KEY)
# Set RUN_BATCH=True to execute; otherwise skip and upload manually
RUN_BATCH = False
output_fname = OUTPUT_DIR / f"{provider}-anomaly-output.jsonl"

if RUN_BATCH and len(shortlist) > 0 and path.exists():
    run_batch(
        provider,
        path,
        output_fname,
        model_id="gpt-4.1-mini" if provider == "openai" else "gemini-2.0-flash",
        poll_interval=60,
    )
    print(f"Output saved to {output_fname}")
else:
    print("Skipping batch run (set RUN_BATCH=True to upload and run). Save API output to", output_fname)

14.9. Step 7: Output Analysis#

Parse the batch output from llm-output/ (responses from the API after the batch completed) into a DataFrame for analysis and export.

14.10. Step 8: Export for Reviewer App#

Export anomalies to JSON for the Anomaly Explanation Reviewer app.

Single explainer (or harmonized output):

from ai4data.anomaly.explanation import export_for_review
export_for_review(anomalies_df, timeseries_df=canonical_df, output_path="anomaly_review.json")

Multiple explainers (preserves each model’s output in tabbed UI):

from ai4data.anomaly.explanation import export_for_review_with_explainers, parse_batch_output

# Parse each provider's batch output
df_openai = parse_batch_output("openai-output.jsonl", "openai", ind_map, geo_map, custom_id_parts=(0, 2, 3))
df_gemini = parse_batch_output("gemini-output.jsonl", "gemini", ind_map, geo_map, custom_id_parts=(0, 2, 3))

export_for_review_with_explainers(
    [("OpenAI", df_openai), ("Gemini", df_gemini)],
    timeseries_df=canonical_df,
    output_path="anomaly_review.json",
    run_arbiter=True,      # optional: use arbiter LLM for primary classification
    invoke_llm=my_llm_fn,  # required if run_arbiter
)
# Then: uv run python -m apps.anomaly_review anomaly_review.json
# Parse batch output into anomalies DataFrame (uses output_fname from run_batch cell)
ind_map = globals().get("ind_map", {})
geo_map = globals().get("geo_map", {})
output_path = OUTPUT_DIR / f"{provider}-anomaly-output.jsonl"
if output_path.exists():
    anomalies_df = parse_batch_output(
        output_path,
        provider=provider,
        indicator_name_map=ind_map,
        geography_name_map=geo_map,
        custom_id_parts=(0, 2, 3),  # prefix, indicator_idx, geography_idx in custom_id
    )
    display(anomalies_df.head())
    anomalies_df.groupby("classification").agg(
        count=("confidence", "count"),
        mean_conf=("confidence", "mean"),
    )
else:
    print("Batch output not found. Run the batch cell (RUN_BATCH=True) or save API output to", output_path)

14.11. Appendix: Using Your Own Data#

For custom data with different column names, use adapter_from_config:

from ai4data.anomaly.explanation import adapter_from_config

mapping = {
    "indicator_id": "YOUR_INDICATOR_COL",
    "indicator_name": "YOUR_INDICATOR_LABEL_COL",
    "geography_id": "YOUR_GEO_COL",
    "geography_name": "YOUR_GEO_LABEL_COL",
    "period": "YEAR",
    "value": "VALUE",
    "is_imputed": "Imputed",
    "anomaly_score": "absZscore",  # or create from Zscore
    "outlier_count": "outlier_indicator_total",
}
adapt = adapter_from_config(mapping)
canonical_df = adapt("wide.csv", "anomalies.csv")