14. Timeseries Anomaly Explanation with LLMs#
14.1. Overview#
This notebook demonstrates a pipeline for identifying, contextualizing, and explaining anomalies in timeseries indicator data using Large Language Models (LLMs). The approach is generalized to work with any timeseries dataset (World Development Indicators, Corporate Scorecard, or custom indicators)—not just a single application.
14.1.1. Purpose#
Automatically generate verifiable explanations for significant data deviations, classifying them as:
External drivers — macroeconomic events, conflicts, policy reforms, disasters
Data errors — placeholders, ingestion issues, computation artifacts
Measurement system updates — rebasing, census revisions, classification changes
Insufficient data — when no verifiable cause can be identified
14.1.2. Learning Objectives#
The canonical data interface for timeseries anomaly inputs
How to adapt legacy formats (e.g., Scorecard wide format) to the canonical schema
The context extraction logic for building LLM prompts
How structured prompts and JSON schema constrain LLM outputs
How to parse and analyze batch LLM outputs
14.1.3. Prerequisites#
Python 3.11+ with
ai4data[anomaly]andai4data[metadata]installedAPI keys for OpenAI and/or Gemini (for LLM inference)
14.2. Pipeline Overview#
flowchart LR
subgraph input [Input]
A[Wide CSV]
B[Anomaly Scores CSV]
end
subgraph adapt [Transform]
C[Adapter]
end
subgraph process [Pipeline]
D[Canonical Format]
E[Anomaly Ranking]
F[Context Extraction]
G[LLM Prompts]
H[Batch API]
I[Parsed Output]
end
A --> C
B --> C
C --> D
D --> E
E --> F
F --> G
G --> H
H --> I
14.3. Step 1: Understand the Data Interface#
The pipeline consumes data in a canonical long-format schema. This format is designed to be:
Indicator-agnostic — works for WDI, Scorecard, or custom indicators
Geography-agnostic — countries, regions, or any spatial unit
Explicit about anomaly metadata — pre-computed scores and imputation flags
14.3.1. Canonical Column Semantics#
Column |
Type |
Description |
|---|---|---|
|
str |
Unique indicator code (e.g., |
|
str |
Human-readable indicator label |
|
str |
Geography code (e.g., |
|
str |
Human-readable geography label |
|
int |
Time period (year for annual data) |
|
float |
Indicator value (NaN allowed) |
|
bool |
Whether the value was imputed |
|
float |
Pre-computed anomaly magnitude (e.g., |z-score|) |
|
int |
Number of detectors that flagged this point |
14.4. Step 2: Load and Adapt Input Data#
If your data is in a legacy format (e.g., Scorecard wide format with separate anomaly scores), use the adapter to convert to canonical format.
import json
import os
from pathlib import Path
import pandas as pd
from jinja2 import Template
from ai4data.anomaly.explanation import (
adapter_from_config,
build_batch_file,
extract_anomaly_contexts,
parse_batch_output,
run_batch,
)
from ai4data.anomaly.explanation.prompts import (
USER_PROMPT_TEMPLATE,
get_anomaly_response_format,
)
# Configuration: adjust paths to your data
# llm-input: processed batch requests (upload this to OpenAI/Gemini Batch API)
# llm-output: responses from the batch API after completion
os.environ["ANOMALY_DATA_DIR"] = "../../data/anomaly/CSC"
os.environ["ANOMALY_LLM_INPUT_DIR"] = "../../data/anomaly/CSC/llm-input"
os.environ["ANOMALY_LLM_OUTPUT_DIR"] = "../../data/anomaly/CSC/llm-output"
DATA_DIR = Path(os.environ.get("ANOMALY_DATA_DIR", "."))
INPUT_DIR = Path(os.environ.get("ANOMALY_LLM_INPUT_DIR", DATA_DIR / "llm-input"))
OUTPUT_DIR = Path(os.environ.get("ANOMALY_LLM_OUTPUT_DIR", DATA_DIR / "llm-output"))
# WIDE_PATH = DATA_DIR / "WB_CSC_WIDEF.csv"
# ANOMALY_PATH = DATA_DIR / "CSC_TOP_ANOMALIES_2026-02-06.CSV"
# if WIDE_PATH.exists() and ANOMALY_PATH.exists():
# adapter = ScorecardWideAdapter()
# canonical_df = adapter.load(WIDE_PATH, ANOMALY_PATH)
# else:
# canonical_df = pd.DataFrame()
# canonical_df = pd.read_excel(DATA_DIR / "CSC_ANOMALIES_2026-02-18.xlsx")
# Create the mapping for the adapter to convert the input format to the canonical format
mapping = {
"indicator_id": "INDICATOR",
"indicator_name": "INDICATOR_LABEL",
"geography_id": "REF_AREA",
"geography_name": "REF_AREA_LABEL",
"period": "YEAR",
"value": "VALUE",
"is_imputed": "Imputed",
"anomaly_score": "absZscore_zscore", # or create from Zscore
"outlier_count": "outlier_indicator_total",
}
adapt = adapter_from_config(mapping)
canonical_df = adapt["adapt_excel"](DATA_DIR / "CSC_ANOMALIES_2026-02-18.xlsx")
14.5. Step 3: Anomaly Identification#
We filter for series with multiple anomaly detectors in agreement (outlier_count >= 3) and rank by combined anomaly_score. This produces a shortlist of (indicator, geography) pairs to explain.
shortlist = pd.DataFrame()
max_count = 10_000
if len(canonical_df) > 0:
# Filter: at least 3 detectors agree, exclude imputed
filtered = canonical_df[
(canonical_df["outlier_count"] >= 3) & (~canonical_df["is_imputed"])
]
# Rank by sum of anomaly_score per (indicator, geography)
ranked = (
filtered.groupby(["indicator_id", "geography_id"])["anomaly_score"]
.sum()
.sort_values(ascending=False)
.reset_index()
)
shortlist = ranked.head(max_count) # Cap for batch processing
print(f"Shortlist: {len(shortlist)} series")
display(shortlist.head())
Shortlist: 870 series
| indicator_id | geography_id | anomaly_score | |
|---|---|---|---|
| 0 | WB_CSC_SI_POV_UMIC | CAN | 6.754442 |
| 1 | WB_CSC_EG_ELC_ACCS_ZS | SUR | 6.725937 |
| 2 | WB_CSC_EG_ELC_ACCS_ZS | BFA | 6.668304 |
| 3 | WB_CSC_SI_POV_DDAY | ESP | 6.607846 |
| 4 | WB_CSC_SI_POV_DDAY | ITA | 6.584522 |
14.6. Step 4: Context Generation#
For each (indicator, geography) in the shortlist, we extract a time-windowed context around the anomaly years. Overlapping windows are merged into contiguous ranges so the LLM sees a single coherent snippet per anomaly cluster.
if len(canonical_df) > 0 and len(shortlist) > 0:
# Build name maps for context
geo_map = canonical_df.set_index("geography_id")["geography_name"].to_dict()
ind_map = canonical_df.set_index("indicator_id")["indicator_name"].to_dict()
# Index by (indicator_id, geography_id) for fast lookup
source_df = canonical_df.set_index(["indicator_id", "geography_id"]).sort_index()
# Extract one example context
row = shortlist.iloc[0]
ind_id, geo_id = row["indicator_id"], row["geography_id"]
series_df = source_df.loc[(ind_id, geo_id)].reset_index()
contexts = extract_anomaly_contexts(
series_df,
geography_name_map=geo_map,
indicator_name_map=ind_map,
period_window=3,
min_outlier_count=3,
)
print("Example context (first series):")
print(json.dumps(contexts[0] if contexts else {}, indent=2))
Example context (first series):
{
"Indicator": "Percentage of global population living in poverty (at $6.85/day)",
"Country": "Canada",
"Series": [
{
"YEAR": 2004,
"VALUE": 1.0,
"Imputed": false
},
{
"YEAR": 2005,
"VALUE": 1.0,
"Imputed": false
},
{
"YEAR": 2006,
"VALUE": 1.0,
"Imputed": false
},
{
"YEAR": 2007,
"VALUE": 0.0,
"Imputed": false
},
{
"YEAR": 2008,
"VALUE": 1.0,
"Imputed": false
},
{
"YEAR": 2009,
"VALUE": 1.0,
"Imputed": false
},
{
"YEAR": 2010,
"VALUE": 0.0,
"Imputed": false
},
{
"YEAR": 2011,
"VALUE": 1.0,
"Imputed": false
},
{
"YEAR": 2012,
"VALUE": 1.0,
"Imputed": false
},
{
"YEAR": 2013,
"VALUE": 1.0,
"Imputed": false
}
]
}
14.7. Step 5: LLM Prompting#
The prompt design instructs the LLM to:
Treat anomalies as windows (start, end), not single points
Classify into one of five categories
Provide evidence strength and optional evidence sources
Output strictly valid JSON matching the schema
user_template = Template(USER_PROMPT_TEMPLATE)
response_format = get_anomaly_response_format()
# Example: render prompt with a context
if contexts:
sample_context = json.dumps(contexts[0], indent=2)
sample_prompt = user_template.render(INPUT_SERIES_INFO=sample_context)
print("Sample user prompt (first 1500 chars):")
print(sample_prompt[:1500] + "...")
Sample user prompt (first 1500 chars):
# TASK
Validate the anomalies in the time series below, explain their most likely verifiable causes, and classify each anomaly window.
# ANALYSIS RULES
1. Treat anomalies as windows ([start, end]), not individual points; merge contiguous anomalous years.
2. Confirm anomalies only if they align with a verifiable event or clear data-quality issue.
3. The time series includes imputed values indicated by the "Imputed" column. Do not attempt to explain these values.
4. You may use general, well-documented historical knowledge (e.g., wars, natural disasters, global crises, pandemics, major policy reforms, or statistical revisions) when such events are clearly established in history and widely recognized.
5. If there is no match with known history or documented statistical events set is_anomaly=false and classification="insufficient_data".
6. Use one of these primary classifications:
- "data_error" — placeholder, rounding, rebasing artifact, template issue, ingestion error, logical computation impossibility.
- "external_driver" — macroeconomic or geopolitical event, conflict, policy reform, disaster, pandemic, global cycle.
- "measurement_system_update" — rebasing, SNA/PPP revision, new census benchmark, classification change.
- "modeling_artifact" — anomaly detector or transformation artifact.
- "insufficient_data" — no verifiable cause.
7. Assign "evidence_strength" as one of:
- "strong_direct" — clearly linked, well-documented event or revision.
- "moderate_...
14.8. Step 6: LLM Inference#
Build a JSONL file of batch requests (one per context) and save to
llm-input/Upload that file to OpenAI Batch API or Gemini File API
Poll for completion; download results and save to
llm-output/
This step creates the batch requests and writes them to llm-input/. After you run the batch externally, responses will be in llm-output/.
shortlist.head(5)
| indicator_id | geography_id | anomaly_score | |
|---|---|---|---|
| 0 | WB_CSC_SI_POV_UMIC | CAN | 6.754442 |
| 1 | WB_CSC_EG_ELC_ACCS_ZS | SUR | 6.725937 |
| 2 | WB_CSC_EG_ELC_ACCS_ZS | BFA | 6.668304 |
| 3 | WB_CSC_SI_POV_DDAY | ESP | 6.607846 |
| 4 | WB_CSC_SI_POV_DDAY | ITA | 6.584522 |
from datetime import datetime, timezone
# Build batch JSONL and save to llm-input (upload this to the batch API)
INPUT_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
provider = "openai" # or "gemini" for Gemini Batch API
batch_fname = INPUT_DIR / f"{provider}-anomaly-{int(datetime.now(tz=timezone.utc).timestamp())}.jsonl"
if len(shortlist) > 0:
path, count = build_batch_file(
batch_fname,
shortlist.head(5), # Limit to 5 for demo; remove .head(5) for full run
source_df,
geo_map,
ind_map,
provider=provider,
model_id="gpt-4.1-mini" if provider == "openai" else "gemini-2.0-flash",
)
print(f"Wrote {count} requests to llm-input: {path}")
print(f"Upload to {provider} batch API; save response to llm-output/{provider}-anomaly-output.jsonl")
Wrote 5 requests to llm-input: ../../data/anomaly/CSC/llm-input/openai-anomaly-1774297027.jsonl
Upload to openai batch API; save response to llm-output/openai-anomaly-output.jsonl
# Upload, run batch, and download results (requires OPENAI_API_KEY or GEMINI_API_KEY)
# Set RUN_BATCH=True to execute; otherwise skip and upload manually
RUN_BATCH = False
output_fname = OUTPUT_DIR / f"{provider}-anomaly-output.jsonl"
if RUN_BATCH and len(shortlist) > 0 and path.exists():
run_batch(
provider,
path,
output_fname,
model_id="gpt-4.1-mini" if provider == "openai" else "gemini-2.0-flash",
poll_interval=60,
)
print(f"Output saved to {output_fname}")
else:
print("Skipping batch run (set RUN_BATCH=True to upload and run). Save API output to", output_fname)
14.9. Step 7: Output Analysis#
Parse the batch output from llm-output/ (responses from the API after the batch completed) into a DataFrame for analysis and export.
14.10. Step 8: Export for Reviewer App#
Export anomalies to JSON for the Anomaly Explanation Reviewer app.
Single explainer (or harmonized output):
from ai4data.anomaly.explanation import export_for_review
export_for_review(anomalies_df, timeseries_df=canonical_df, output_path="anomaly_review.json")
Multiple explainers (preserves each model’s output in tabbed UI):
from ai4data.anomaly.explanation import export_for_review_with_explainers, parse_batch_output
# Parse each provider's batch output
df_openai = parse_batch_output("openai-output.jsonl", "openai", ind_map, geo_map, custom_id_parts=(0, 2, 3))
df_gemini = parse_batch_output("gemini-output.jsonl", "gemini", ind_map, geo_map, custom_id_parts=(0, 2, 3))
export_for_review_with_explainers(
[("OpenAI", df_openai), ("Gemini", df_gemini)],
timeseries_df=canonical_df,
output_path="anomaly_review.json",
run_arbiter=True, # optional: use arbiter LLM for primary classification
invoke_llm=my_llm_fn, # required if run_arbiter
)
# Then: uv run python -m apps.anomaly_review anomaly_review.json
# Parse batch output into anomalies DataFrame (uses output_fname from run_batch cell)
ind_map = globals().get("ind_map", {})
geo_map = globals().get("geo_map", {})
output_path = OUTPUT_DIR / f"{provider}-anomaly-output.jsonl"
if output_path.exists():
anomalies_df = parse_batch_output(
output_path,
provider=provider,
indicator_name_map=ind_map,
geography_name_map=geo_map,
custom_id_parts=(0, 2, 3), # prefix, indicator_idx, geography_idx in custom_id
)
display(anomalies_df.head())
anomalies_df.groupby("classification").agg(
count=("confidence", "count"),
mean_conf=("confidence", "mean"),
)
else:
print("Batch output not found. Run the batch cell (RUN_BATCH=True) or save API output to", output_path)
14.11. Appendix: Using Your Own Data#
For custom data with different column names, use adapter_from_config:
from ai4data.anomaly.explanation import adapter_from_config
mapping = {
"indicator_id": "YOUR_INDICATOR_COL",
"indicator_name": "YOUR_INDICATOR_LABEL_COL",
"geography_id": "YOUR_GEO_COL",
"geography_name": "YOUR_GEO_LABEL_COL",
"period": "YEAR",
"value": "VALUE",
"is_imputed": "Imputed",
"anomaly_score": "absZscore", # or create from Zscore
"outlier_count": "outlier_indicator_total",
}
adapt = adapter_from_config(mapping)
canonical_df = adapt("wide.csv", "anomalies.csv")