Batch Processing Implementation#
Introduction#
This notebook demonstrates a comprehensive batch processing implementation for data labeling of climate change-related documents. The workflow involves several key steps, including loading and preprocessing documents, extracting relevant dataset mentions, validating the extracted mentions, and finally, using an autonomous reasoning agent to ensure the quality and accuracy of the extracted data.
The primary goal is to identify and categorize dataset mentions from research papers and policy documents, ensuring they are correctly classified based on naming specificity, context, and relevance. This process leverages various tools and libraries, including OpenAI, Huggingface transformers, and PyMuPDF, to automate and streamline the data labeling process.
The notebook is structured as follows:
Loading and Preprocessing: Load documents from files or URLs and preprocess the text for extraction.
Extraction: Use a pre-trained model for prefiltering before extracting dataset mentions from the text for efficiency.
Validation: Validate the extracted mentions using a judge model to ensure correctness.
Reasoning: Apply an autonomous reasoning agent to further refine and validate the extracted data.
Batch Processing: Create and submit batch files for processing by the OpenAI API.
Results Consolidation: Consolidate and save the results for further analysis.
This aims to achieve accurate and reliable data labeling for climate change-related documents, facilitating further research and analysis in this critical field.
# install the required packages
!pip install --upgrade openai pymupdf transformers python-dotenv
Requirement already satisfied: openai in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (1.75.0)
Requirement already satisfied: pymupdf in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (1.25.5)
Requirement already satisfied: transformers in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (4.51.3)
Requirement already satisfied: python-dotenv in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (1.1.0)
Requirement already satisfied: anyio<5,>=3.5.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (4.9.0)
Requirement already satisfied: distro<2,>=1.7.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (1.9.0)
Requirement already satisfied: httpx<1,>=0.23.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (0.28.1)
Requirement already satisfied: jiter<1,>=0.4.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (0.9.0)
Requirement already satisfied: pydantic<3,>=1.9.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (2.11.3)
Requirement already satisfied: sniffio in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (1.3.1)
Requirement already satisfied: tqdm>4 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (4.67.1)
Requirement already satisfied: typing-extensions<5,>=4.11 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (4.13.2)
Requirement already satisfied: filelock in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (3.18.0)
Requirement already satisfied: huggingface-hub<1.0,>=0.30.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (0.30.2)
Requirement already satisfied: numpy>=1.17 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (2.2.4)
Requirement already satisfied: packaging>=20.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (24.2)
Requirement already satisfied: pyyaml>=5.1 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (6.0.2)
Requirement already satisfied: regex!=2019.12.17 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (2024.11.6)
Requirement already satisfied: requests in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (2.32.3)
Requirement already satisfied: tokenizers<0.22,>=0.21 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (0.21.1)
Requirement already satisfied: safetensors>=0.4.3 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (0.5.3)
Requirement already satisfied: idna>=2.8 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from anyio<5,>=3.5.0->openai) (3.10)
Requirement already satisfied: certifi in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from httpx<1,>=0.23.0->openai) (2025.1.31)
Requirement already satisfied: httpcore==1.* in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from httpx<1,>=0.23.0->openai) (1.0.8)
Requirement already satisfied: h11<0.15,>=0.13 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from httpcore==1.*->httpx<1,>=0.23.0->openai) (0.14.0)
Requirement already satisfied: fsspec>=2023.5.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from huggingface-hub<1.0,>=0.30.0->transformers) (2025.3.2)
Requirement already satisfied: annotated-types>=0.6.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from pydantic<3,>=1.9.0->openai) (0.7.0)
Requirement already satisfied: pydantic-core==2.33.1 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from pydantic<3,>=1.9.0->openai) (2.33.1)
Requirement already satisfied: typing-inspection>=0.4.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from pydantic<3,>=1.9.0->openai) (0.4.0)
Requirement already satisfied: charset-normalizer<4,>=2 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from requests->transformers) (3.4.1)
Requirement already satisfied: urllib3<3,>=1.21.1 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from requests->transformers) (2.4.0)
from openai import OpenAI
api_key = "YOUR_API_KEY"
client = OpenAI(api_key=api_key)
Load our prefiltering model from Huggingface
# silence warnings
import warnings
warnings.filterwarnings("ignore")
from transformers import AutoTokenizer
from transformers import pipeline
import os
os.environ["TOKENIZERS_PARALLELISM"] = "true"
# load tokenizer from huggingface.co/models using our repository id
data_model_id = "ai4data-use/bert-base-uncased-data-use"
tokenizer = AutoTokenizer.from_pretrained(data_model_id)
# load the model from huggingface.co/models using our repository id
classifier = pipeline("text-classification", model=data_model_id, tokenizer=tokenizer)
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[4], line 12
9 tokenizer = AutoTokenizer.from_pretrained(data_model_id)
11 # load the model from huggingface.co/models using our repository id
---> 12 classifier = pipeline("text-classification", model=data_model_id, tokenizer=tokenizer)
File /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages/transformers/pipelines/__init__.py:942, in pipeline(task, model, config, tokenizer, feature_extractor, image_processor, processor, framework, revision, use_fast, token, device, device_map, torch_dtype, trust_remote_code, model_kwargs, pipeline_class, **kwargs)
940 if isinstance(model, str) or framework is None:
941 model_classes = {"tf": targeted_task["tf"], "pt": targeted_task["pt"]}
--> 942 framework, model = infer_framework_load_model(
943 adapter_path if adapter_path is not None else model,
944 model_classes=model_classes,
945 config=config,
946 framework=framework,
947 task=task,
948 **hub_kwargs,
949 **model_kwargs,
950 )
952 model_config = model.config
953 hub_kwargs["_commit_hash"] = model.config._commit_hash
File /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages/transformers/pipelines/base.py:242, in infer_framework_load_model(model, config, model_classes, task, framework, **model_kwargs)
216 """
217 Select framework (TensorFlow or PyTorch) to use from the `model` passed. Returns a tuple (framework, model).
218
(...) 239 `Tuple`: A tuple framework, model.
240 """
241 if not is_tf_available() and not is_torch_available():
--> 242 raise RuntimeError(
243 "At least one of TensorFlow 2.0 or PyTorch should be installed. "
244 "To install TensorFlow 2.0, read the instructions at https://www.tensorflow.org/install/ "
245 "To install PyTorch, read the instructions at https://pytorch.org/."
246 )
247 if isinstance(model, str):
248 model_kwargs["_from_pipeline"] = task
RuntimeError: At least one of TensorFlow 2.0 or PyTorch should be installed. To install TensorFlow 2.0, read the instructions at https://www.tensorflow.org/install/ To install PyTorch, read the instructions at https://pytorch.org/.
Load Helper Functions
def chunk_text(text, tokenizer, max_length=500):
"""
Split the text into chunks of max_length tokens, ensuring no chunk exceeds the model's token limit,
and includes special tokens properly.
Args:
text (str): The input text to be chunked.
tokenizer: The tokenizer to use for encoding and decoding.
max_length (int): The maximum length of tokens allowed in each chunk, including special tokens.
Returns:
list: A list of text chunks as strings.
"""
# Reserve space for special tokens (e.g., [CLS], [SEP])
special_tokens_count = 2 # Adjust based on the tokenizer's special token usage
chunk_size = max_length - special_tokens_count
# Tokenize the text into token IDs without truncation
tokens = tokenizer.encode(text, add_special_tokens=False)
# Split the tokens into chunks
chunks = []
for i in range(0, len(tokens), chunk_size):
token_chunk = tokens[i : i + chunk_size]
# Add special tokens to the chunk
token_chunk_with_specials = (
[tokenizer.cls_token_id] + token_chunk + [tokenizer.sep_token_id]
)
# Decode the chunk back to text
chunk_text = tokenizer.decode(
token_chunk_with_specials, skip_special_tokens=False
)
chunks.append(chunk_text)
return chunks
def clean_extracted_text(text):
"""
Cleans text extracted from PDFs using PyMuPDF.
- Reduces unnecessary whitespace and artifacts while preserving meaningful structure.
- Prevents unintentional removal of spaces or concatenation of words.
"""
# Replace non-breaking spaces (\xa0) with regular spaces
text = text.replace("\xa0", " ")
# Remove control characters (ASCII 0-31) except line breaks
text = re.sub(r"[\x00-\x08\x0B-\x1F]", "", text)
# Collapse excessive newlines (more than 2) but preserve single newlines
text = re.sub(r"\n{3,}", "\n\n", text)
# Collapse multiple spaces but preserve single spaces between words
text = re.sub(r"[ \t]{2,}", " ", text)
# Preserve dashes at line breaks (e.g., "address-\nclimate" to "address-climate")
text = re.sub(r"([a-zA-Z])-?\n([a-zA-Z])", r"\1-\2", text)
# Trim leading/trailing spaces and newlines
text = text.strip()
return text
from typing import Callable
def should_process_page(text: str, classifier: Callable, tokenizer) -> bool:
"""Determine whether a page should be processed."""
chunks = chunk_text(text, tokenizer, max_length=500)
return any(classifier(chunk)[0]["label"] != "NO_DATA" for chunk in chunks)
def save_text_per_document(text, text_output_path, page_idx):
"""
Save cleaned text for each page to a single JSON file, appending page data.
Parameters:
text (str): The cleaned text for the current page.
text_output_path (str): The path to the text JSON file.
page_idx (int): The current page index.
Returns:
None
"""
# Load existing text data or create a new structure
if os.path.exists(text_output_path):
with open(text_output_path, "r", encoding="utf-8") as existing_file:
text_data = json.load(existing_file)
else:
text_data = {
"source": os.path.splitext(os.path.basename(text_output_path))[0],
"pages": {},
}
# Add text for the current page
text_data["pages"][str(page_idx + 1)] = text
# Save the updated text data
os.makedirs(os.path.dirname(text_output_path), exist_ok=True)
with open(text_output_path, "w", encoding="utf-8") as text_file:
json.dump(text_data, text_file, indent=4)
def save_texts(raw_text: str, text: str, paths: dict, page_idx: int):
"""Save raw and cleaned text for the page."""
save_text_per_document(raw_text, paths["raw_text_output"], page_idx)
save_text_per_document(text, paths["text_output"], page_idx)
def generate_file_paths(base_name: str):
"""Generate file paths for saving outputs."""
return {
"text_output": f"output/text/{base_name}.json",
"raw_text_output": f"output/raw_text/{base_name}.json",
}
import json
import os
from nltk.tokenize import sent_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
def find_best_matching_span(text, snippet, window: int = 1):
sents = sent_tokenize(text)
tfidf = TfidfVectorizer(ngram_range=(1, 3))
mi_vec = tfidf.fit_transform([snippet])
sents_vec = tfidf.transform(sents)
mx_idx = cosine_similarity(mi_vec, sents_vec).flatten().argmax()
span_sents = sents[max(mx_idx - window, 0) : min(mx_idx + window + 1, len(sents))]
return {
"match_idx": mx_idx,
"match_sent": sents[mx_idx],
"match_span_sents": span_sents,
"match_span": " ".join(span_sents),
}
def find_empirical_span(
text: str, sentences: list, best_match_idx: int, window: int = 1
):
# Define the start and end indices to include adjacent sentences for context
start_idx = text.index(sentences[max(best_match_idx - window, 0)])
last_sent = sentences[min(best_match_idx + window, len(sentences) - 1)]
# NOTE: This will fail if the last_sent also occurred in an earlier part of the text.
# SOLUTION: Start the search for last_sent from the start_idx
end_idx = start_idx + text[start_idx:].index(last_sent) + len(last_sent)
# Extract the final span
context_span = text[start_idx:end_idx]
return {
"empirical_span": context_span, # Extracted span
"start_idx": start_idx,
"end_idx": end_idx,
}
def get_empirical_mentioned_in(
text, mentioned_in, window: int = 1, with_match_output: bool = False
):
"""
Extract the most relevant span of text from the original document (`text`)
that matches the `mentioned_in` field. Returns the span, label, start, and end indices.
"""
# Tokenize the text into sentences
sentences = sent_tokenize(text)
match_output = find_best_matching_span(text, mentioned_in, window=window)
best_match_idx = match_output["match_idx"]
output = find_empirical_span(text, sentences, best_match_idx, window=window)
output["empirical_mentioned_in"] = output.pop("empirical_span")
output = {
"label": "mentioned_in", # Label as "mentioned_in"
**output,
}
if with_match_output:
output.update(match_output)
return output
# load helper functions
from copy import deepcopy
import networkx as nx
def consolidate_dataset(raw_text: str, data: dict):
text = raw_text
page_data = {"dataset_used": data.get("dataset_used", False), "data_mentions": []}
G = nx.Graph()
sents = sent_tokenize(text)
_datasets = []
for ds in data.get("dataset", []):
mentioned_in = ds.pop("mentioned_in") or ""
try:
mi = find_best_matching_span(mentioned_in, ds["raw_name"], window=0)
mi = mi["match_span"]
match_output = find_best_matching_span(text, mi, window=1)
except ValueError:
# Likely that the `mentioned_in` is not found in the text or not correct.
# We try expanding the search to the entire text.
match_output = find_best_matching_span(text, ds["raw_name"], window=1)
ds["sent_spans"] = match_output["match_span_sents"]
sents_idx = sorted([sents.index(s) for s in ds["sent_spans"]])
ds["sent"] = match_output["match_sent"]
ds["sent_idx"] = sents_idx
G.add_edges_from(zip(sents_idx[:-1], sents_idx[1:]))
_datasets.append(ds)
_datasets = sorted(_datasets, key=lambda x: x["sent_idx"][0])
# The connected components in the graphs form the `mentioned_in`s.
mentioned_ins = sorted(
[sorted(x) for x in nx.connected_components(G)], key=lambda x: x[0]
)
updated_mentions = []
for midx in mentioned_ins:
_mi = {"mentioned_in": " ".join([sents[i] for i in midx]), "datasets": []}
for ds in _datasets:
ds = deepcopy(ds)
if ds["sent_idx"][0] in midx:
ds.pop("sent_idx")
ds.pop("sent_spans")
_mi["datasets"].append(ds)
updated_mentions.append(_mi)
page_data["data_mentions"] = updated_mentions
return page_data
def save_output_per_document(raw_text, data, output_path, page_idx):
"""
Save output data to a JSON file per document, appending new page data.
Parameters:
data (LabelledResponseFormat): The data to save, in the validated format.
output_path (str): The output path for the document-wide JSON file.
page_idx (int): The current page index being processed.
Returns:
None
"""
# Restructure and consolidate dataset if possible
page_data = consolidate_dataset(raw_text, data)
# Initialize the new page's data structure
page_data = {"page": page_idx + 1, **page_data}
# Check if the file already exists
if os.path.exists(output_path):
with open(output_path, "r", encoding="utf-8") as existing_file:
document_data = json.load(existing_file)
else:
# Create a new JSON structure
document_data = {
"source": os.path.splitext(os.path.basename(output_path))[0],
"pages": [],
}
# Append the new page data
document_data["pages"].append(page_data)
# Save the updated document data back to the file
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w", encoding="utf-8") as output_file:
json.dump(document_data, output_file, indent=4)
from openai.lib._parsing._completions import type_to_response_format_param
def build_payload(page, prompt, response_format=None):
"""
Constructs a properly formatted OpenAI API payload for batch processing.
Args:
page (str): Text content of the page.
prompt (str): System prompt for guidance.
response_format (LabelledResponseFormat): Response schema from `StructuredOutputs`.
Returns:
dict: JSON payload formatted for OpenAI batch API.
"""
default_kwargs = dict(
model=MODEL,
temperature=0,
max_tokens=16383, # `max_completion_tokens` isn't a valid OpenAI param
top_p=1,
frequency_penalty=0,
presence_penalty=0,
seed=42,
)
# Ensure response_format uses JSON schema from LabelledResponseFormat
payload = dict(
messages=[
{"role": "system", "content": [{"type": "text", "text": prompt}]},
{"role": "user", "content": [{"type": "text", "text": page}]},
],
# response_format={"type": "json_schema", "schema": response_format.model_json_schema()}, # to_strict_json_schema
response_format=type_to_response_format_param(response_format)
if response_format
else {"type": "text"},
**default_kwargs,
)
return payload
import pymupdf
import requests
import tempfile
def load_doc(fname_or_url: str, n_pages: int = 1) -> list:
"""
Loads a PDF document from a file or URL and extracts content from it.
Args:
fname_or_url (str): The path to the PDF file or a URL where the file can be downloaded.
n_pages (int, optional): The number of pages to extract. Defaults to 1.
Returns:
list: A list of dictionaries containing the extracted text and page indices.
Raises:
ValueError: If the number of pages is not greater than 0.
Exception: If the PDF file fails to download from the specified URL or if there's an issue loading the document.
"""
# Validate that the number of pages is greater than 0
assert n_pages > 0, "The number of pages must be greater than 0."
def _load_doc(fname: str) -> list:
"""
Creates content from two successive pages.
Args:
fname (str): The path to the PDF file.
Returns:
list: A list of dictionaries containing the extracted text and page indices.
"""
# Initialize an empty list to store the contents
contents = []
# Open the PDF document
doc = pymupdf.open(fname)
# Iterate over the pages, skipping the last n_pages - 1 pages
for page_idx in range(len(doc) - (n_pages - 1)):
# Extract text from each of the next n_pages pages and store it as a dictionary
contents.append(
dict(
text="\n\n".join(
[doc[page_idx + i].get_text() for i in range(n_pages)]
),
pages=[page_idx + i for i in range(n_pages)],
)
)
# Validate that all pages were loaded successfully
assert len(doc) - (n_pages - 1) == len(contents), "Failed to load all pages."
return contents
# Check if the file or URL starts with 'http:' or 'https:'
if fname_or_url.startswith(("http:", "https:")):
# Download the PDF file from the specified URL
with tempfile.NamedTemporaryFile(suffix=".pdf") as temp_pdf:
response = requests.get(fname_or_url, stream=True)
if response.status_code == 200:
# Write the downloaded data to the temporary file
for chunk in response.iter_content(chunk_size=8192):
temp_pdf.write(chunk)
# Seek back to the beginning of the file and return the loaded document
temp_pdf.seek(0)
return _load_doc(temp_pdf.name)
else:
# Raise an exception if there's an issue with the download or loading the document
raise Exception(
f"Failed to download PDF, status code: {response.status_code}"
)
else:
# If it's not a URL, simply load the document from the specified file path
return _load_doc(fname_or_url)
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum
# Define Enums for categorical fields
class Context(str, Enum):
background = "background"
supporting = "supporting"
primary = "primary"
class Specificity(str, Enum):
properly_named = "properly_named"
descriptive_but_unnamed = "descriptive_but_unnamed"
vague_generic = "vague_generic"
class Relevance(str, Enum):
directly_relevant = "directly_relevant"
indirectly_relevant = "indirectly_relevant"
not_relevant = "not_relevant"
class DatasetEntry(BaseModel):
raw_name: Optional[str] = Field(
..., description="The exact dataset name as it appears in the text."
)
harmonized_name: Optional[str] = Field(
None, description="The standardized or full name of the dataset."
)
acronym: Optional[str] = Field(
None, description="The short name or acronym associated with the dataset."
)
context: Context
specificity: Specificity
relevance: Relevance
mentioned_in: Optional[str] = Field(
None, description="The exact text excerpt where the dataset is mentioned."
)
producer: Optional[str] = Field(
None, description="The organization responsible for producing the dataset."
)
data_type: Optional[str] = Field(
None, description="The type of data represented by the dataset."
)
class LabelledResponseFormat(BaseModel):
dataset: List[DatasetEntry] = Field(
..., description="A list of datasets mentioned in the paper."
)
dataset_used: bool = Field(
..., description="A boolean indicating if a dataset is used in the paper."
)
DATA_USE_TASK_PROMPT = """You are an expert in extracting and categorizing dataset mentions from research papers and policy documents. Your task is to **identify and extract all valid dataset mentions**, ensuring they are correctly classified based on naming specificity, context, and relevance.
### **What Qualifies as a Dataset?**
A dataset is a structured collection of data used for empirical research, analysis, or policy-making. Examples include:
- **Surveys & Census Data** (e.g., LSMS, DHS, national census records)
- **Indicators & Indexes** (e.g., HDI, GFSI, WDI, ND-GAIN, EPI)
- **Geospatial & Environmental Data** (e.g., OpenStreetMap, Sentinel-2 imagery)
- **Economic & Trade Data** (e.g., UN Comtrade, Balance of Payments Statistics)
- **Health & Public Safety Data** (e.g., epidemiological surveillance, crime reports)
- **Time-Series & Energy Data** (e.g., climate projections, electricity demand records)
- **Transport & Mobility Data** (e.g., road accident statistics, smart city traffic flow)
- **Other emerging dataset types** as identified in the text.
**Important:**
If the dataset does not fit into the examples above, infer the **most appropriate category** from the context and **create a new `"data_type"` if necessary**.
### **What Should NOT Be Extracted?**
Do **not** extract mentions that do not clearly refer to a dataset, including, but not limited to:
1. **Organizations & Institutions** (e.g., WHO, IMF, UNDP, "World Bank data" unless it explicitly refers to a dataset)
2. **Reports & Policy Documents** (e.g., "Fiscal Monitor by the IMF", "IEA Energy Report"; only extract if the dataset itself is referenced)
3. **Generic Mentions of Data** (e.g., "various sources", "survey results from multiple institutions")
4. **Economic Models & Policy Frameworks** (e.g., "GDP growth projections", "macroeconomic forecasts")
5. **Legislation & Agreements** (e.g., "Paris Agreement", "General Data Protection Regulation")
### **Rules for Extraction**
1. **Extract All Structured Data Mentions**
- If the dataset is explicitly named (e.g., "Global Fishing Watch"), label it as `"properly_named"`.
- If the dataset is described but not explicitly named (e.g., "electricity usage data from Albania"), label it as `"descriptive_but_unnamed"`.
- If the dataset mention is too generic (e.g., "electricity usage data"), label it as `"vague_generic"`.
2. **Ensure `"data_type"` Is Always Assigned**
- **Use an existing category if applicable.**
- **If no suitable category exists, create a new `"data_type"` based on context.**
3. **Classify `"context"` Correctly**
- `"primary"`: The dataset is used for direct analysis in the document.
- `"supporting"`: The dataset is referenced to validate or compare findings.
- `"background"`: The dataset is mentioned as general context or prior research.
**Examples:**
- `"The LSMS-ISA data is analyzed to assess the impact of agricultural practices on productivity."` → `"primary"`
- `"Our results align with previous studies that used LSMS-ISA."` → `"supporting"`
- `"LSMS-ISA is widely recognized as a reliable data source for agricultural research."` → `"background"`
4. **Capture Full Sentence Context**
- The `"mentioned_in"` field must always include the **full sentence** where the dataset is referenced.
- If a dataset is mistakenly extracted from an unrelated sentence, correct it.
### **Extraction Schema**
Each extracted dataset should have the following fields:
- `raw_name`: Exact dataset name from the text (**no paraphrasing**).
- `harmonized_name`: If properly named, use directly; if referenced in multiple ways, standardize using the most precise form in the text, otherwise, set this to None.
- `acronym`: Extract if explicitly mentioned.
- `mentioned_in`: **Full sentence** where the dataset appears (**no paraphrasing**).
- `context`: **primary / supporting / background**
- `specificity`: **properly_named / descriptive_but_unnamed / vague_generic**
- `relevance`: **directly_relevant / indirectly_relevant / not_relevant**
- `producer`: **Extract only if explicitly mentioned; otherwise, set to `None`.**
- `data_type`: **Assign based on existing categories, but create new ones if necessary.**
### **Handling New or Unlisted Data Types**
- If a dataset does not fit into existing categories, **infer an appropriate name** for its `"data_type"` based on context.
- Use a **general but informative label** for new data types (e.g., `"Climate Risk Data"`, `"Social Media Analytics"`).
### **Important: Do NOT Skip Unnamed Datasets**
If a dataset is described but lacks a proper name, extract it under `"descriptive_but_unnamed"` or `"vague_generic"`, which ever is appropriate.
If `"producer"` is not mentioned, set it to `None` rather than inferring."""
Creating Batches for processing#
import os
# Directory for batch files & results
BATCH_DIR = "./openai-batchfiles/extraction" # Directory for batch files
OUTPUT_DIR = "./extraction_outputs" # Directory for OpenAI API responses
MODEL = "gpt-4o-mini"
# Ensure output folders exist
os.makedirs(BATCH_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
from tqdm.auto import tqdm
# Process Each PDF into Its Own Batch
def process_pdf_into_batch(pdf_fname, prompt, response_format, classifier, tokenizer):
"""Processes a single PDF file into an OpenAI batch file."""
file_basename = os.path.basename(pdf_fname).replace(".pdf", "")
batch_fname = os.path.join(BATCH_DIR, f"batch-{file_basename}.jsonl")
paths = generate_file_paths(file_basename)
if os.path.exists(batch_fname):
print(f"Batch file {batch_fname} already exists. Skipping.")
return batch_fname # Skip if already processed
pages = load_doc(pdf_fname, n_pages=1)
seen_custom_ids = set()
with open(batch_fname, "a+") as f:
for page in tqdm(pages, desc=f"Processing {file_basename}"):
raw_text = page.get("text")
page_text = clean_extracted_text(raw_text)
page_num = page.get("pages")[0]
save_texts(raw_text, page_text, paths, page_num)
if not page_text or not should_process_page(
page_text, classifier, tokenizer
):
continue
custom_id = f"{file_basename}-page-{page_num}"
if custom_id in seen_custom_ids:
continue
seen_custom_ids.add(custom_id)
batch_entry = dict(
custom_id=custom_id,
method="POST",
url="/v1/chat/completions",
body=build_payload(page_text, prompt, response_format),
)
f.write(json.dumps(batch_entry) + "\n")
print(f"Created batch file: {batch_fname}")
return batch_fname
First, you need to create input directory and put your pdf files there
import glob
INPUT_DIRECTORY = "./input/"
os.makedirs(INPUT_DIRECTORY, exist_ok=True)
PDF_FILES = glob.glob(INPUT_DIRECTORY + "/*.pdf")
PDF_FILES
import re
prompt = DATA_USE_TASK_PROMPT
response_format = LabelledResponseFormat
# loop through each PDF file and process it into a batch
for pdf_fname in PDF_FILES:
_ = process_pdf_into_batch(
pdf_fname, prompt, response_format, classifier, tokenizer
)
import os
# Directories
BATCH_DIR = "./openai-batchfiles/extraction"
MERGED_BATCH_DIR = "./batches" # Where we save the consolidated batch files
MAX_REQUESTS_PER_BATCH = 15 # Number of requests per batch
# Ensure merged batch directory exists
os.makedirs(MERGED_BATCH_DIR, exist_ok=True)
def consolidate_batches(
batch_dir=BATCH_DIR, batch_size=MAX_REQUESTS_PER_BATCH, process="extraction"
):
"""
Consolidates JSONL batch files into larger ones while preserving structure.
Args:
batch_dir (str): Directory containing batch files.
batch_size (int): Max requests per merged batch file.
Returns:
list: List of saved merged batch file paths.
"""
batch_files = [
os.path.join(batch_dir, f)
for f in os.listdir(batch_dir)
if f.endswith(".jsonl")
]
batch_files.sort()
merged_batches = []
current_batch = []
batch_count = 1
if process:
os.makedirs(os.path.join(MERGED_BATCH_DIR, process), exist_ok=True)
for batch_file in batch_files:
try:
with open(batch_file, "r") as f:
batch_data = [
json.loads(line) for line in f if line.strip()
] # Skip empty lines
except json.JSONDecodeError as e:
print(f"Skipping {batch_file} due to JSON decoding error: {e}")
continue # Skip corrupt files
for entry in batch_data:
current_batch.append(entry)
# If batch reaches the specified size, save and reset
if len(current_batch) >= batch_size:
merged_batch_file = os.path.join(
MERGED_BATCH_DIR, f"{process}/merged-batch-{batch_count}.jsonl"
)
with open(merged_batch_file, "w") as f_out:
f_out.write(
"\n".join(json.dumps(item) for item in current_batch) + "\n"
)
merged_batches.append(merged_batch_file)
print(f"Created merged batch: {merged_batch_file}")
# Reset for next batch
current_batch = []
batch_count += 1
# Save any remaining batch data
if current_batch:
merged_batch_file = os.path.join(
MERGED_BATCH_DIR, f"{process}/merged-batch-{batch_count}.jsonl"
)
with open(merged_batch_file, "w") as f_out:
f_out.write("\n".join(json.dumps(item) for item in current_batch) + "\n")
merged_batches.append(merged_batch_file)
print(f"Created merged batch: {merged_batch_file}")
return merged_batches
merged_batches = consolidate_batches(
batch_dir=BATCH_DIR, batch_size=15, process="extraction"
) # change your batch_size here (50K max for OpenAI)
merged_batches
Submitting JSONL Batches to OpenAI#
After creating and merging the JSONL batches, the next step is to submit these batches to the OpenAI API for processing. The following code demonstrates how to upload and submit the merged batch files to OpenAI:
import time
def submit_batches_to_openai(batch_filenames):
"""
Uploads and submits merged batch files to OpenAI.
Args:
batch_filenames (list): List of merged batch file paths.
"""
# batch_input_files = []
openai_batches = []
for batch_fname in tqdm(batch_filenames, desc="Uploading merged batch files"):
try:
with open(batch_fname, "rb") as file:
uploaded_file_id = client.files.create(file=file, purpose="batch").id
# Submit batch job
batch = client.batches.create(
input_file_id=uploaded_file_id,
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={"description": f"Merged batch processing: {batch_fname}"},
)
openai_batches.append(batch)
print(f"Submitted batch: {batch.id}")
except Exception as e:
print(f"Error processing {batch_fname}: {e}")
time.sleep(1)
return openai_batches
openai_batches = submit_batches_to_openai(merged_batches)
It will take a while for the batch process to be completed
You can continue checking until the status is ‘completed’.
# Helper function to list all submitted batches, their statuses.
def list_batches():
"""
Lists all submitted batches along with their statuses.
"""
try:
batches = client.batches.list()
print("All Batch Jobs:")
for batch in batches:
print(
f"Batch ID: {batch.id}, Status: {batch.status}, Created At: {batch.created_at}"
)
except Exception as e:
print(f"Error listing batches: {e}")
# list_batches()
for batch in openai_batches:
print(f"Batch ID: {batch.id}, Status: {batch.status}")
# once completed we can download the results
# for demo purposes, the following batch_ids are completed and ready for download
batch_ids = [
"batch_67b4adcc7a248190b5c339d0fba3a727",
"batch_67b4adc9d4c88190ab4eabdf6578552e",
]
def retrieve_batch_results(batch_ids, process="extraction"):
if process:
os.makedirs(f"./batches/{process}", exist_ok=True)
for batch_id in batch_ids:
batch_details = client.batches.retrieve(batch_id)
if batch_details.status == "completed":
result = client.files.content(batch_details.output_file_id).content
results_file = f"./batches/{process}/results-{batch_id}.jsonl"
with open(results_file, "wb") as file:
file.write(result)
print(f"saved results to {results_file}")
return None
_ = retrieve_batch_results(batch_ids, process="extraction")
def load_and_save_outputs(batch_files):
payload = []
for batch_file in batch_files:
# Loading data from saved file
with open(batch_file, "r") as file:
for line in file:
# Parsing the JSON string into a dict and appending to the list of results
json_object = json.loads(line.strip())
payload.append(json_object)
return payload
import glob
batch_files = glob.glob("./batches/extraction/results-*.jsonl")
payload = load_and_save_outputs(batch_files)
extraction_path = "./extraction_outputs/extraction"
raw_text_path = "./output/raw_text"
os.makedirs(extraction_path, exist_ok=True)
os.makedirs(raw_text_path, exist_ok=True)
def map_and_save_output(payload, extraction_path, raw_text_path):
for res in payload:
fname_origin = res["custom_id"].split("-page-")[0]
page = int(res["custom_id"].split("-page-")[-1])
page_str = str(page + 1)
content = json.loads(
res["response"]["body"]["choices"][0]["message"]["content"]
)
extraction_resfname = extraction_path + f"/{fname_origin}.json"
if content.get("dataset") != []:
with open(
raw_text_path + f"/{fname_origin}.json", "r", encoding="utf-8"
) as raw_txt:
raw_text = json.load(raw_txt).get("pages").get(page_str)
save_output_per_document(raw_text, content, extraction_resfname, page)
return None
_ = map_and_save_output(payload, extraction_path, raw_text_path)
# you can use the following function to inspect the json file
def inspect_json(json_file):
try:
with open(json_file, "r") as f:
results = [json.loads(line) for line in f]
return results
except Exception:
with open(json_file, "r", encoding="utf-8") as fn:
return json.load(fn)
Inspect and check the output
json_file = (
"./extraction_outputs/extraction/06c998e896785ab8b6d6caa4a8beb2f505c375a5.json"
)
inspect_json(json_file)
LLM-as-a-Judge for Quality Assessment#
After extracting dataset mentions, we validate the output using an LLM-as-a-judge pipeline. This involves:
Validation Criteria: Assess dataset mentions as valid, invalid, or needing clarification.
Consistency Check: Ensure consistent validity unless context changes.
Context-Aware Inference: Extract missing details from the
mentioned_infield.Data Type Classification: Infer appropriate data types from context.
Producer Identification: Extract explicitly mentioned producers; otherwise, set to
None.
Validation Criteria A dataset is valid if:
Structured and systematically collected.
Reproducible, consisting of collected records.
Always Valid Datasets:
Government statistical and geospatial datasets.
Official surveys, administrative records, economic transaction data, and scientific research datasets.
Invalid Datasets:
Derived indicators or computational constructs.
Standalone statistical metrics without clear underlying data.
General organizations, reports, or methodologies.
Uncertain Cases:
Vaguely named but potentially valid:
"Potentially valid—needs dataset name confirmation."Too generic:
"Needs clarification—dataset name is too generic."
Key Validation Rules
Consistency Check: Maintain validity unless context changes.
Context-Aware Inference: Extract missing details from the
mentioned_infield.Data Type Classification: Infer appropriate data types from context.
Producer Identification: Extract explicitly mentioned producers; otherwise, set to
None.
Each dataset assessment must conform to the JudgeResponseFormat schema.
# Create a pydantic model for the judge response
from pydantic import model_validator
class JudgeDatasetEntry(BaseModel):
raw_name: Optional[str] = Field(
..., description="The exact dataset name as it appears in the text."
)
harmonized_name: Optional[str] = Field(
None, description="The standardized or full name of the dataset."
)
acronym: Optional[str] = Field(
None, description="The short name or acronym associated with the dataset."
)
context: Context
specificity: Specificity
relevance: Relevance
producer: Optional[str] = Field(
None, description="The organization responsible for producing the dataset."
)
data_type: Optional[str] = Field(
None, description="The type of data represented by the dataset."
)
year: Optional[str] = Field(
None,
description="The year associated with the dataset, if explicitly mentioned.",
)
valid: bool = Field(
..., description="True if the mention is valid, false otherwise."
)
invalid_reason: Optional[str] = Field(
None, description="Reason why the mention was invalid (if applicable)."
)
sent: Optional[str] = Field(
None, description="The exact sentence where the dataset is mentioned."
)
# entities: Optional[EmpiricalMention] = Field(None, description="Additional empirical context for the dataset.")
# Validator to ensure valid and invalid_reason consistency
@model_validator(mode="after")
def check_validity(cls, instance):
if not instance.valid and not instance.invalid_reason:
raise ValueError("If 'valid' is False, 'invalid_reason' must be provided.")
return instance
class JudgeDatasetGroup(BaseModel):
mentioned_in: Optional[str] = Field(
None, description="The exact text excerpt where the dataset is mentioned."
)
datasets: List[JudgeDatasetEntry] = Field(
..., description="A list of validated datasets mentioned in the paper."
)
class JudgeResponseFormat(BaseModel):
page_number: int = Field(..., description="The page number in the document.")
dataset_used: bool = Field(
...,
description="Flag indicating whether a valid dataset is mentioned in the page.",
)
data_mentions: List[JudgeDatasetGroup] = Field(
...,
description="A list of structured dataset information mentioned in the paper.",
)
# judge prompt
JUDGE_PROMPT = """You are an expert in dataset validation. Your task is to assess whether each dataset mention is **valid, invalid, or requires clarification**, ensuring correctness and consistency based on the dataset's **empirical context**.
---
### **Dataset Validation Criteria**
A dataset is **valid** if:
1. **It is structured**—collected systematically for research, policy, or administrative purposes.
2. **It is reproducible**—meaning it consists of collected records rather than being derived purely from computations or models.
**Always Valid Datasets:**
- Government statistical and geospatial datasets (e.g., census, official land records).
- Official surveys, administrative records, economic transaction data, and scientific research datasets.
**Invalid Datasets:**
Set as invalid all `"raw_name"` that belong under the following classes.
- Derived indicators or computational constructs (e.g., "wealth score", "mine dummy", "district total production").
- Standalone statistical metrics without a clear underlying dataset (e.g., "average income growth rate" without source data).
- General organizations, reports, or methodologies (e.g., "World Bank", "UNDP Report", "machine learning model").
**Uncertain Cases:**
- If a dataset is **vaguely named but potentially valid**, set it as valid but return: `"Potentially valid—needs dataset name confirmation."`
- If a dataset reference is **too generic** (e.g., `"time-varying data on production"`), set it as valid but return: `"Needs clarification—dataset name is too generic."`
---
### **Key Validation Rules**
1. **Consistency Check:**
- If a `"raw_name"` has been marked **valid earlier**, it **must remain valid** unless its meaning significantly differs in a new context.
2. **Context-Aware Inference:**
- If certain details are missing such as the **Year**, **Producer**, or **Data Type**, try to extract them from the `mentioned_in` field if available and correctly relate to the data.
3. **Data Type Classification (Flexible & Adaptive):**
- Infer the most appropriate `"data_type"` dynamically from context.
- Possible types: **Surveys, geospatial data, administrative records, financial reports, research datasets, climate observations, etc.**
- If **no predefined category fits**, create a **new `"data_type"` that best describes the dataset.**
4. **Producer Identification:**
- If the **producer (organization/institution) is explicitly mentioned**, extract it.
- If not mentioned, **do not infer—set `"producer": None"` instead.**
---
### **JudgeResponseFormat Schema**
Each dataset assessment must conform strictly to the JudgeResponseFormat schema."""
import os
# Directory for batch files & results
JUDGE_BATCH_DIR = "./openai-batchfiles/judge-batches" # Directory for batch files
JUDGE_OUTPUT_DIR = "./extraction_outputs/judge" # Directory for OpenAI API responses
MODEL = "gpt-4o-mini"
# Ensure output folders exist
os.makedirs(JUDGE_BATCH_DIR, exist_ok=True)
os.makedirs(JUDGE_OUTPUT_DIR, exist_ok=True)
def extracted_outputs_to_batch(extraction_fname, prompt, response_format):
"""Processes a single extraction file into a judge batch file."""
file_basename = os.path.basename(extraction_fname).replace(".json", "")
batch_fname = os.path.join(JUDGE_BATCH_DIR, f"judge-batch-{file_basename}.jsonl")
if os.path.exists(batch_fname):
print(f"Batch file {batch_fname} already exists. Skipping.")
return batch_fname # Skip if already processed
with open(extraction_fname, "r") as f:
extraction_data = json.load(f)
for page_data in extraction_data["pages"]:
page_num = page_data.get("page")
data = page_data.get("data_mentions")
if not data:
continue
custom_id = f"{file_basename}-page-{page_num}"
batch_entry = dict(
custom_id=custom_id,
method="POST",
url="/v1/chat/completions",
body=build_payload(json.dumps(data), prompt, response_format),
)
with open(batch_fname, "a+") as f:
f.write(json.dumps(batch_entry) + "\n")
print(f"Created batch file: {batch_fname}")
return batch_fname
EXTRACTION_DIRECTORY = "./extraction_outputs"
EXTRACTION_FILES = glob.glob(EXTRACTION_DIRECTORY + "/extraction/*.json")
judge_prompt = JUDGE_PROMPT
judge_response_format = JudgeResponseFormat
EXTRACTION_FILES
# loop through each PDF file and process it into a batch
for json_fname in EXTRACTION_FILES:
_ = extracted_outputs_to_batch(json_fname, judge_prompt, judge_response_format)
merged_judge_batches = consolidate_batches(
batch_dir=JUDGE_BATCH_DIR, batch_size=10, process="judge"
)
merged_judge_batches
openai_judge_batches = submit_batches_to_openai(merged_judge_batches) # submit
for judge_batch in openai_judge_batches:
print(judge_batch.id, client.batches.retrieve(judge_batch.id).status)
# for demo purposes, the following batch_ids are completed and ready for download
judge_batch_ids = [
"batch_67b5eccdd0788190b90d2eda5a0eb580",
"batch_67b5eccbca848190bf0d16ae8a8fa7ea",
]
_ = retrieve_batch_results(judge_batch_ids, process="judge")
batch_files = glob.glob("./batches/judge/results-*.jsonl")
judge_payload = load_and_save_outputs(batch_files)
judge_payload[0]
import os
def map_judge_output(judge_payload, judge_path):
os.makedirs(judge_path, exist_ok=True)
for res in judge_payload:
fname_origin = res["custom_id"].split("-page-")[0]
page_number = int(res["custom_id"].split("-page-")[1])
content = json.loads(
res["response"]["body"]["choices"][0]["message"]["content"]
)
extraction_resfname = os.path.join(judge_path, f"{fname_origin}.json")
if content.get("data_mentions"):
# Construct the page entry
page_entry = {
"page": page_number,
"dataset_used": bool(content.get("data_mentions")),
"data_mentions": content["data_mentions"],
}
# Read existing data
if os.path.exists(extraction_resfname):
with open(extraction_resfname, "r", encoding="utf-8") as f:
try:
existing_data = json.load(f)
except json.JSONDecodeError:
existing_data = {
"source": fname_origin,
"pages": [],
} # Handle empty or corrupted file
else:
existing_data = {"source": fname_origin, "pages": []}
# Ensure the existing structure is correct
if "pages" not in existing_data:
existing_data["pages"] = []
# Append new page entry
existing_data["pages"].append(page_entry)
# Write back to file
with open(extraction_resfname, "w", encoding="utf-8") as f:
json.dump(existing_data, f, indent=4, ensure_ascii=False)
return None
judge_path = "./extraction_outputs/judge"
_ = map_judge_output(judge_payload, judge_path)
Autonomous Reasoning Agent#
Once the information is validated by the LLM, we will use the autonomous reasoning agent to further refine and validate the extracted data. The reasoning agent will follow a structured prompt to ensure the accuracy and relevance of the dataset mentions.
THINKING_PROMPT = """Your task is to review a structured user input that may mention a dataset in a text. Please take your time.
Carefully analyze what the text in the `mentioned_in` field explicitly means and in what context the `raw_name` is discussed. Never infer, imply, or assume, so you must exclusively rely on the text as facts. If there are multiple datasets, do the assessment individually.
Plan a strategy to ensure you can maximize the chances of correctly judging and classifying whether the provided input:
- Clearly, the `raw_name` falls under the concept of a data/dataset and not by extension or implicitly.
- Whether the raw_name is actually in the `mentioned_in`.
- Whether the harmonized_name (if present) is actually in the `mentioned_in`. If not found, remove it from the output.
- The `raw_name` is `properly_named` (e.g., DHS, LSMS, etc.), `descriptive_but_unnamed` (administrative school records in Ghana for 2020) , or `vague_generic` (a survey data). Any of these are valid data mentions. To be sure, elaborate how you interpret these classes and use that for classifying.
- The context concerning usage of the dataset is mentioned: is it `primary`, `supporting`, or `background`.
Then, write down your strategy.
After you write down your strategy, synthesize it to develop a rubric of what qualifies as a dataset, which you must use to base your judgment.
Incorporate a devil's advocate review as part of your strategy. If the review shows inconsistency, update accordingly. Do not reason based on assumption, inference, or implicit thinking. Relationships do not count as a dataset; for example, the producer is not a dataset.
Execute the strategy, **step by step**, and write an analysis of how you interpret the `raw_name` in the context of the `mentioned_in`.
If your analysis results in the `raw_name` being a dataset, set the `valid` field to `true`, otherwise, set it to `false`. In both cases, return the result of your analysis focusing on the `raw_name` in the `reason` field. If it is invalid, set the `specificity` and `context` to null.
ALWAYS WRITE A DEVIL'S ADVOCATE REVIEW AFTER THE ANALYSIS BEFORE CONCLUDING.
After you write your analysis, your output must repeat the input with the `specificity`, `context`, `valid` and `invalid_reason` values replaced accordingly in the same level as the corresponding `raw_name`. IMPORTANT: the final output must be between these tags <OUTPUTDATA>```json<the output must be here>```</OUTPUTDATA>"""
# we are only interested in valid mentions of data from the judge llm
def filter_valid_mentions(validated_input):
# Filter out invalid datasets before passing to LLM
filtered_mentions = []
for page in validated_input["pages"]:
for mention in page.get("data_mentions", []):
valid_datasets = [
dataset
for dataset in mention["datasets"]
if dataset.get("valid", False)
]
if valid_datasets: # Only keep mentions with at least one valid dataset
filtered_mentions.append(
{
"mentioned_in": mention["mentioned_in"],
"datasets": valid_datasets,
"page": page["page"],
"dataset_used": page["dataset_used"],
}
)
input_data = {
"source": validated_input.get("source"),
"data_mentions": filtered_mentions,
}
return input_data
def prepare_input_data(data):
for mention in data.get("data_mentions", []):
for ds in mention.get("datasets", []):
# Replace string "None" with actual None
if ds.get("producer") == "None":
ds["producer"] = None
# Remove unwanted keys
keys_to_remove = [
"sent",
"specificity",
"context",
"relevance",
"data_type",
"valid",
"invalid_reason",
]
for key in keys_to_remove:
ds.pop(key, None) # `None` as default to avoid KeyError
return data
import os
# Directory for batch files & results
REASONING_BATCH_DIR = (
"./openai-batchfiles/reasoning-batches" # Directory for batch files
)
REASONING_OUTPUT_DIR = (
"./extraction_outputs/reasoning" # Directory for OpenAI API responses
)
MODEL = "gpt-4o-mini"
# Ensure output folders exist
os.makedirs(REASONING_BATCH_DIR, exist_ok=True)
os.makedirs(REASONING_OUTPUT_DIR, exist_ok=True)
from hashlib import md5
def judge_outputs_to_batch(judge_fname, prompt):
"""Processes a single judge file into a reasoning batch file."""
file_basename = os.path.basename(judge_fname).replace(".json", "")
batch_fname = os.path.join(
REASONING_BATCH_DIR, f"reasoning-batch-{file_basename}.jsonl"
)
if os.path.exists(batch_fname):
print(f"Batch file {batch_fname} already exists. Skipping.")
return batch_fname # Skip if already processed
with open(judge_fname, "r") as f:
judge_data = json.load(f)
filtered_data = filter_valid_mentions(judge_data)
prepared_data = prepare_input_data(filtered_data)
for page_data in prepared_data["data_mentions"]:
mention_hash = page_data.get("mentioned_in")
if not mention_hash:
continue
mhash = md5(mention_hash.encode()).hexdigest()[:8]
custom_id = f"{file_basename}_{mhash}-pg-{page_data['page']}"
batch_entry = dict(
custom_id=custom_id,
method="POST",
url="/v1/chat/completions",
body=build_payload(json.dumps(page_data), prompt),
)
with open(batch_fname, "a+") as f:
f.write(json.dumps(batch_entry) + "\n")
print(f"Created batch file: {batch_fname}")
return batch_fname
EXTRACTION_DIRECTORY = "./extraction_outputs"
JUDGE_FILES = glob.glob(EXTRACTION_DIRECTORY + "/judge/*.json")
reasoning_prompt = THINKING_PROMPT
# loop through each PDF file and process it into a batch
for json_fname in JUDGE_FILES:
_ = judge_outputs_to_batch(json_fname, reasoning_prompt)
openai_reasoning_batches = consolidate_batches(
batch_dir=REASONING_BATCH_DIR, batch_size=15, process="reasoning"
)
openai_reasoning_batches = submit_batches_to_openai(openai_reasoning_batches)
for reasoning_batch in openai_reasoning_batches:
print(client.batches.retrieve(reasoning_batch.id).status, reasoning_batch.id)
# openai_reasoning_batches = [batch.id for batch in openai_reasoning_batches]
# for demo purposes, the following batch_ids are completed and ready for download
reasoning_batch_ids = [
"batch_67b6c639468c8190b87726743431e76f",
"batch_67b6c63b48848190be8cfc7fa018300c",
]
_ = retrieve_batch_results(reasoning_batch_ids, process="reasoning")
batch_files_reasoning = glob.glob("./batches/reasoning/results-*.jsonl")
reasoning_payload = load_and_save_outputs(batch_files_reasoning)
#!pip install beautifulsoup4
from bs4 import BeautifulSoup
def map_reasoning_output(reasoning_payload, reasoning_path):
os.makedirs(reasoning_path, exist_ok=True)
for res in reasoning_payload:
fname_origin = res["custom_id"].split("_")[0]
content = res["response"]["body"]["choices"][0]["message"]["content"]
reasoning_fname = os.path.join(reasoning_path, f"{fname_origin}.json")
content = content[content.index("```json") + len("```json") :]
content = content[: content.index("```")]
if "<outputdata>" in content.lower() and "</outputdata>" in content.lower():
soup = BeautifulSoup(content, "html.parser")
content = soup.find("outputdata").text
content = json.loads(content)
# Construct the page entry
filtered_data = []
filtered_data.append(
{
"mentioned_in": content["mentioned_in"],
"page": content["page"],
"dataset_used": content["dataset_used"],
"datasets": [
dataset
for dataset in content.get("datasets")
if dataset.get("valid", False)
],
}
)
page_entry = {
"data_mentions": filtered_data # we keep only valid: true entries
}
if os.path.exists(reasoning_fname):
with open(reasoning_fname, "r", encoding="utf-8") as f:
try:
existing_data = json.load(f)
except json.JSONDecodeError:
existing_data = {
"source": fname_origin,
"pages": [],
} # Handle empty or corrupted file
else:
existing_data = {"source": fname_origin, "pages": []}
# Ensure the existing structure is correct
if "pages" not in existing_data:
existing_data["pages"] = []
# Append new page entry
existing_data["pages"].append(page_entry)
# Write back to file
with open(reasoning_fname, "w", encoding="utf-8") as f:
json.dump(existing_data, f, indent=4, ensure_ascii=False)
return None
reasoning_path = "./extraction_outputs/reasoning"
_ = map_reasoning_output(reasoning_payload, reasoning_path)
inspect_json(
"./extraction_outputs/reasoning/The-local-socioeconomic-effects-of-gold-mining-evidence-from-Ghana.json"
)
That’s it! Now you can make your fine-tuning dataset. This notebook has walked you through a comprehensive batch processing implementation for data labeling of climate change-related documents. By leveraging various tools and libraries, we’ve automated and streamlined the data labeling process.
For making your fine-tuning dataset, you can check this notebook to learn more.