Batch Processing Implementation#

Introduction#

This notebook demonstrates a comprehensive batch processing implementation for data labeling of climate change-related documents. The workflow involves several key steps, including loading and preprocessing documents, extracting relevant dataset mentions, validating the extracted mentions, and finally, using an autonomous reasoning agent to ensure the quality and accuracy of the extracted data.

The primary goal is to identify and categorize dataset mentions from research papers and policy documents, ensuring they are correctly classified based on naming specificity, context, and relevance. This process leverages various tools and libraries, including OpenAI, Huggingface transformers, and PyMuPDF, to automate and streamline the data labeling process.

The notebook is structured as follows:

  1. Loading and Preprocessing: Load documents from files or URLs and preprocess the text for extraction.

  2. Extraction: Use a pre-trained model for prefiltering before extracting dataset mentions from the text for efficiency.

  3. Validation: Validate the extracted mentions using a judge model to ensure correctness.

  4. Reasoning: Apply an autonomous reasoning agent to further refine and validate the extracted data.

  5. Batch Processing: Create and submit batch files for processing by the OpenAI API.

  6. Results Consolidation: Consolidate and save the results for further analysis.

This aims to achieve accurate and reliable data labeling for climate change-related documents, facilitating further research and analysis in this critical field.

# install the required packages
!pip install --upgrade openai pymupdf transformers python-dotenv
Requirement already satisfied: openai in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (1.75.0)
Requirement already satisfied: pymupdf in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (1.25.5)
Requirement already satisfied: transformers in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (4.51.3)
Requirement already satisfied: python-dotenv in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (1.1.0)
Requirement already satisfied: anyio<5,>=3.5.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (4.9.0)
Requirement already satisfied: distro<2,>=1.7.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (1.9.0)
Requirement already satisfied: httpx<1,>=0.23.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (0.28.1)
Requirement already satisfied: jiter<1,>=0.4.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (0.9.0)
Requirement already satisfied: pydantic<3,>=1.9.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (2.11.3)
Requirement already satisfied: sniffio in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (1.3.1)
Requirement already satisfied: tqdm>4 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (4.67.1)
Requirement already satisfied: typing-extensions<5,>=4.11 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from openai) (4.13.2)
Requirement already satisfied: filelock in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (3.18.0)
Requirement already satisfied: huggingface-hub<1.0,>=0.30.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (0.30.2)
Requirement already satisfied: numpy>=1.17 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (2.2.4)
Requirement already satisfied: packaging>=20.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (24.2)
Requirement already satisfied: pyyaml>=5.1 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (6.0.2)
Requirement already satisfied: regex!=2019.12.17 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (2024.11.6)
Requirement already satisfied: requests in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (2.32.3)
Requirement already satisfied: tokenizers<0.22,>=0.21 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (0.21.1)
Requirement already satisfied: safetensors>=0.4.3 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from transformers) (0.5.3)
Requirement already satisfied: idna>=2.8 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from anyio<5,>=3.5.0->openai) (3.10)
Requirement already satisfied: certifi in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from httpx<1,>=0.23.0->openai) (2025.1.31)
Requirement already satisfied: httpcore==1.* in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from httpx<1,>=0.23.0->openai) (1.0.8)
Requirement already satisfied: h11<0.15,>=0.13 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from httpcore==1.*->httpx<1,>=0.23.0->openai) (0.14.0)
Requirement already satisfied: fsspec>=2023.5.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from huggingface-hub<1.0,>=0.30.0->transformers) (2025.3.2)
Requirement already satisfied: annotated-types>=0.6.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from pydantic<3,>=1.9.0->openai) (0.7.0)
Requirement already satisfied: pydantic-core==2.33.1 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from pydantic<3,>=1.9.0->openai) (2.33.1)
Requirement already satisfied: typing-inspection>=0.4.0 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from pydantic<3,>=1.9.0->openai) (0.4.0)
Requirement already satisfied: charset-normalizer<4,>=2 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from requests->transformers) (3.4.1)
Requirement already satisfied: urllib3<3,>=1.21.1 in /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages (from requests->transformers) (2.4.0)
from openai import OpenAI

api_key = "YOUR_API_KEY"
client = OpenAI(api_key=api_key)

Load our prefiltering model from Huggingface

# silence warnings

import warnings

warnings.filterwarnings("ignore")
from transformers import AutoTokenizer
from transformers import pipeline
import os

os.environ["TOKENIZERS_PARALLELISM"] = "true"

# load tokenizer from huggingface.co/models using our repository id
data_model_id = "ai4data-use/bert-base-uncased-data-use"
tokenizer = AutoTokenizer.from_pretrained(data_model_id)

# load the model from huggingface.co/models using our repository id
classifier = pipeline("text-classification", model=data_model_id, tokenizer=tokenizer)
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[4], line 12
      9 tokenizer = AutoTokenizer.from_pretrained(data_model_id)
     11 # load the model from huggingface.co/models using our repository id
---> 12 classifier = pipeline("text-classification", model=data_model_id, tokenizer=tokenizer)

File /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages/transformers/pipelines/__init__.py:942, in pipeline(task, model, config, tokenizer, feature_extractor, image_processor, processor, framework, revision, use_fast, token, device, device_map, torch_dtype, trust_remote_code, model_kwargs, pipeline_class, **kwargs)
    940 if isinstance(model, str) or framework is None:
    941     model_classes = {"tf": targeted_task["tf"], "pt": targeted_task["pt"]}
--> 942     framework, model = infer_framework_load_model(
    943         adapter_path if adapter_path is not None else model,
    944         model_classes=model_classes,
    945         config=config,
    946         framework=framework,
    947         task=task,
    948         **hub_kwargs,
    949         **model_kwargs,
    950     )
    952 model_config = model.config
    953 hub_kwargs["_commit_hash"] = model.config._commit_hash

File /opt/hostedtoolcache/Python/3.11.12/x64/lib/python3.11/site-packages/transformers/pipelines/base.py:242, in infer_framework_load_model(model, config, model_classes, task, framework, **model_kwargs)
    216 """
    217 Select framework (TensorFlow or PyTorch) to use from the `model` passed. Returns a tuple (framework, model).
    218 
   (...)    239     `Tuple`: A tuple framework, model.
    240 """
    241 if not is_tf_available() and not is_torch_available():
--> 242     raise RuntimeError(
    243         "At least one of TensorFlow 2.0 or PyTorch should be installed. "
    244         "To install TensorFlow 2.0, read the instructions at https://www.tensorflow.org/install/ "
    245         "To install PyTorch, read the instructions at https://pytorch.org/."
    246     )
    247 if isinstance(model, str):
    248     model_kwargs["_from_pipeline"] = task

RuntimeError: At least one of TensorFlow 2.0 or PyTorch should be installed. To install TensorFlow 2.0, read the instructions at https://www.tensorflow.org/install/ To install PyTorch, read the instructions at https://pytorch.org/.

Load Helper Functions

def chunk_text(text, tokenizer, max_length=500):
    """
    Split the text into chunks of max_length tokens, ensuring no chunk exceeds the model's token limit,
    and includes special tokens properly.

    Args:
        text (str): The input text to be chunked.
        tokenizer: The tokenizer to use for encoding and decoding.
        max_length (int): The maximum length of tokens allowed in each chunk, including special tokens.

    Returns:
        list: A list of text chunks as strings.
    """
    # Reserve space for special tokens (e.g., [CLS], [SEP])
    special_tokens_count = 2  # Adjust based on the tokenizer's special token usage
    chunk_size = max_length - special_tokens_count

    # Tokenize the text into token IDs without truncation
    tokens = tokenizer.encode(text, add_special_tokens=False)

    # Split the tokens into chunks
    chunks = []
    for i in range(0, len(tokens), chunk_size):
        token_chunk = tokens[i : i + chunk_size]
        # Add special tokens to the chunk
        token_chunk_with_specials = (
            [tokenizer.cls_token_id] + token_chunk + [tokenizer.sep_token_id]
        )
        # Decode the chunk back to text
        chunk_text = tokenizer.decode(
            token_chunk_with_specials, skip_special_tokens=False
        )
        chunks.append(chunk_text)

    return chunks
def clean_extracted_text(text):
    """
    Cleans text extracted from PDFs using PyMuPDF.
    - Reduces unnecessary whitespace and artifacts while preserving meaningful structure.
    - Prevents unintentional removal of spaces or concatenation of words.
    """

    # Replace non-breaking spaces (\xa0) with regular spaces
    text = text.replace("\xa0", " ")

    # Remove control characters (ASCII 0-31) except line breaks
    text = re.sub(r"[\x00-\x08\x0B-\x1F]", "", text)

    # Collapse excessive newlines (more than 2) but preserve single newlines
    text = re.sub(r"\n{3,}", "\n\n", text)

    # Collapse multiple spaces but preserve single spaces between words
    text = re.sub(r"[ \t]{2,}", " ", text)

    # Preserve dashes at line breaks (e.g., "address-\nclimate" to "address-climate")
    text = re.sub(r"([a-zA-Z])-?\n([a-zA-Z])", r"\1-\2", text)

    # Trim leading/trailing spaces and newlines
    text = text.strip()

    return text
from typing import Callable


def should_process_page(text: str, classifier: Callable, tokenizer) -> bool:
    """Determine whether a page should be processed."""

    chunks = chunk_text(text, tokenizer, max_length=500)
    return any(classifier(chunk)[0]["label"] != "NO_DATA" for chunk in chunks)
def save_text_per_document(text, text_output_path, page_idx):
    """
    Save cleaned text for each page to a single JSON file, appending page data.

    Parameters:
        text (str): The cleaned text for the current page.
        text_output_path (str): The path to the text JSON file.
        page_idx (int): The current page index.

    Returns:
        None
    """
    # Load existing text data or create a new structure
    if os.path.exists(text_output_path):
        with open(text_output_path, "r", encoding="utf-8") as existing_file:
            text_data = json.load(existing_file)
    else:
        text_data = {
            "source": os.path.splitext(os.path.basename(text_output_path))[0],
            "pages": {},
        }

    # Add text for the current page
    text_data["pages"][str(page_idx + 1)] = text

    # Save the updated text data
    os.makedirs(os.path.dirname(text_output_path), exist_ok=True)
    with open(text_output_path, "w", encoding="utf-8") as text_file:
        json.dump(text_data, text_file, indent=4)
def save_texts(raw_text: str, text: str, paths: dict, page_idx: int):
    """Save raw and cleaned text for the page."""
    save_text_per_document(raw_text, paths["raw_text_output"], page_idx)
    save_text_per_document(text, paths["text_output"], page_idx)
def generate_file_paths(base_name: str):
    """Generate file paths for saving outputs."""
    return {
        "text_output": f"output/text/{base_name}.json",
        "raw_text_output": f"output/raw_text/{base_name}.json",
    }
import json
import os
from nltk.tokenize import sent_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity


def find_best_matching_span(text, snippet, window: int = 1):
    sents = sent_tokenize(text)
    tfidf = TfidfVectorizer(ngram_range=(1, 3))
    mi_vec = tfidf.fit_transform([snippet])
    sents_vec = tfidf.transform(sents)

    mx_idx = cosine_similarity(mi_vec, sents_vec).flatten().argmax()
    span_sents = sents[max(mx_idx - window, 0) : min(mx_idx + window + 1, len(sents))]

    return {
        "match_idx": mx_idx,
        "match_sent": sents[mx_idx],
        "match_span_sents": span_sents,
        "match_span": " ".join(span_sents),
    }


def find_empirical_span(
    text: str, sentences: list, best_match_idx: int, window: int = 1
):
    # Define the start and end indices to include adjacent sentences for context
    start_idx = text.index(sentences[max(best_match_idx - window, 0)])
    last_sent = sentences[min(best_match_idx + window, len(sentences) - 1)]
    # NOTE: This will fail if the last_sent also occurred in an earlier part of the text.
    # SOLUTION: Start the search for last_sent from the start_idx
    end_idx = start_idx + text[start_idx:].index(last_sent) + len(last_sent)

    # Extract the final span
    context_span = text[start_idx:end_idx]

    return {
        "empirical_span": context_span,  # Extracted span
        "start_idx": start_idx,
        "end_idx": end_idx,
    }


def get_empirical_mentioned_in(
    text, mentioned_in, window: int = 1, with_match_output: bool = False
):
    """
    Extract the most relevant span of text from the original document (`text`)
    that matches the `mentioned_in` field. Returns the span, label, start, and end indices.
    """
    # Tokenize the text into sentences
    sentences = sent_tokenize(text)
    match_output = find_best_matching_span(text, mentioned_in, window=window)
    best_match_idx = match_output["match_idx"]

    output = find_empirical_span(text, sentences, best_match_idx, window=window)
    output["empirical_mentioned_in"] = output.pop("empirical_span")

    output = {
        "label": "mentioned_in",  # Label as "mentioned_in"
        **output,
    }

    if with_match_output:
        output.update(match_output)

    return output
# load helper functions

from copy import deepcopy
import networkx as nx


def consolidate_dataset(raw_text: str, data: dict):
    text = raw_text
    page_data = {"dataset_used": data.get("dataset_used", False), "data_mentions": []}

    G = nx.Graph()
    sents = sent_tokenize(text)
    _datasets = []

    for ds in data.get("dataset", []):
        mentioned_in = ds.pop("mentioned_in") or ""

        try:
            mi = find_best_matching_span(mentioned_in, ds["raw_name"], window=0)
            mi = mi["match_span"]
            match_output = find_best_matching_span(text, mi, window=1)
        except ValueError:
            # Likely that the `mentioned_in` is not found in the text or not correct.
            # We try expanding the search to the entire text.
            match_output = find_best_matching_span(text, ds["raw_name"], window=1)

        ds["sent_spans"] = match_output["match_span_sents"]
        sents_idx = sorted([sents.index(s) for s in ds["sent_spans"]])
        ds["sent"] = match_output["match_sent"]
        ds["sent_idx"] = sents_idx

        G.add_edges_from(zip(sents_idx[:-1], sents_idx[1:]))
        _datasets.append(ds)

    _datasets = sorted(_datasets, key=lambda x: x["sent_idx"][0])

    # The connected components in the graphs form the `mentioned_in`s.
    mentioned_ins = sorted(
        [sorted(x) for x in nx.connected_components(G)], key=lambda x: x[0]
    )
    updated_mentions = []

    for midx in mentioned_ins:
        _mi = {"mentioned_in": " ".join([sents[i] for i in midx]), "datasets": []}

        for ds in _datasets:
            ds = deepcopy(ds)
            if ds["sent_idx"][0] in midx:
                ds.pop("sent_idx")
                ds.pop("sent_spans")
                _mi["datasets"].append(ds)

        updated_mentions.append(_mi)

    page_data["data_mentions"] = updated_mentions

    return page_data


def save_output_per_document(raw_text, data, output_path, page_idx):
    """
    Save output data to a JSON file per document, appending new page data.

    Parameters:
        data (LabelledResponseFormat): The data to save, in the validated format.
        output_path (str): The output path for the document-wide JSON file.
        page_idx (int): The current page index being processed.

    Returns:
        None
    """

    # Restructure and consolidate dataset if possible
    page_data = consolidate_dataset(raw_text, data)

    # Initialize the new page's data structure
    page_data = {"page": page_idx + 1, **page_data}

    # Check if the file already exists
    if os.path.exists(output_path):
        with open(output_path, "r", encoding="utf-8") as existing_file:
            document_data = json.load(existing_file)
    else:
        # Create a new JSON structure
        document_data = {
            "source": os.path.splitext(os.path.basename(output_path))[0],
            "pages": [],
        }

    # Append the new page data
    document_data["pages"].append(page_data)

    # Save the updated document data back to the file
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    with open(output_path, "w", encoding="utf-8") as output_file:
        json.dump(document_data, output_file, indent=4)
from openai.lib._parsing._completions import type_to_response_format_param


def build_payload(page, prompt, response_format=None):
    """
    Constructs a properly formatted OpenAI API payload for batch processing.

    Args:
        page (str): Text content of the page.
        prompt (str): System prompt for guidance.
        response_format (LabelledResponseFormat): Response schema from `StructuredOutputs`.

    Returns:
        dict: JSON payload formatted for OpenAI batch API.
    """
    default_kwargs = dict(
        model=MODEL,
        temperature=0,
        max_tokens=16383,  # `max_completion_tokens` isn't a valid OpenAI param
        top_p=1,
        frequency_penalty=0,
        presence_penalty=0,
        seed=42,
    )

    # Ensure response_format uses JSON schema from LabelledResponseFormat
    payload = dict(
        messages=[
            {"role": "system", "content": [{"type": "text", "text": prompt}]},
            {"role": "user", "content": [{"type": "text", "text": page}]},
        ],
        # response_format={"type": "json_schema", "schema": response_format.model_json_schema()},  # to_strict_json_schema
        response_format=type_to_response_format_param(response_format)
        if response_format
        else {"type": "text"},
        **default_kwargs,
    )

    return payload
import pymupdf
import requests
import tempfile


def load_doc(fname_or_url: str, n_pages: int = 1) -> list:
    """
    Loads a PDF document from a file or URL and extracts content from it.

    Args:
        fname_or_url (str): The path to the PDF file or a URL where the file can be downloaded.
        n_pages (int, optional): The number of pages to extract. Defaults to 1.

    Returns:
        list: A list of dictionaries containing the extracted text and page indices.

    Raises:
        ValueError: If the number of pages is not greater than 0.
        Exception: If the PDF file fails to download from the specified URL or if there's an issue loading the document.
    """

    # Validate that the number of pages is greater than 0
    assert n_pages > 0, "The number of pages must be greater than 0."

    def _load_doc(fname: str) -> list:
        """
        Creates content from two successive pages.

        Args:
            fname (str): The path to the PDF file.

        Returns:
            list: A list of dictionaries containing the extracted text and page indices.
        """

        # Initialize an empty list to store the contents
        contents = []

        # Open the PDF document
        doc = pymupdf.open(fname)

        # Iterate over the pages, skipping the last n_pages - 1 pages
        for page_idx in range(len(doc) - (n_pages - 1)):
            # Extract text from each of the next n_pages pages and store it as a dictionary
            contents.append(
                dict(
                    text="\n\n".join(
                        [doc[page_idx + i].get_text() for i in range(n_pages)]
                    ),
                    pages=[page_idx + i for i in range(n_pages)],
                )
            )

        # Validate that all pages were loaded successfully
        assert len(doc) - (n_pages - 1) == len(contents), "Failed to load all pages."

        return contents

    # Check if the file or URL starts with 'http:' or 'https:'
    if fname_or_url.startswith(("http:", "https:")):
        # Download the PDF file from the specified URL
        with tempfile.NamedTemporaryFile(suffix=".pdf") as temp_pdf:
            response = requests.get(fname_or_url, stream=True)
            if response.status_code == 200:
                # Write the downloaded data to the temporary file
                for chunk in response.iter_content(chunk_size=8192):
                    temp_pdf.write(chunk)
                # Seek back to the beginning of the file and return the loaded document
                temp_pdf.seek(0)
                return _load_doc(temp_pdf.name)
            else:
                # Raise an exception if there's an issue with the download or loading the document
                raise Exception(
                    f"Failed to download PDF, status code: {response.status_code}"
                )

    else:
        # If it's not a URL, simply load the document from the specified file path
        return _load_doc(fname_or_url)
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum


# Define Enums for categorical fields
class Context(str, Enum):
    background = "background"
    supporting = "supporting"
    primary = "primary"


class Specificity(str, Enum):
    properly_named = "properly_named"
    descriptive_but_unnamed = "descriptive_but_unnamed"
    vague_generic = "vague_generic"


class Relevance(str, Enum):
    directly_relevant = "directly_relevant"
    indirectly_relevant = "indirectly_relevant"
    not_relevant = "not_relevant"


class DatasetEntry(BaseModel):
    raw_name: Optional[str] = Field(
        ..., description="The exact dataset name as it appears in the text."
    )
    harmonized_name: Optional[str] = Field(
        None, description="The standardized or full name of the dataset."
    )
    acronym: Optional[str] = Field(
        None, description="The short name or acronym associated with the dataset."
    )
    context: Context
    specificity: Specificity
    relevance: Relevance
    mentioned_in: Optional[str] = Field(
        None, description="The exact text excerpt where the dataset is mentioned."
    )
    producer: Optional[str] = Field(
        None, description="The organization responsible for producing the dataset."
    )
    data_type: Optional[str] = Field(
        None, description="The type of data represented by the dataset."
    )


class LabelledResponseFormat(BaseModel):
    dataset: List[DatasetEntry] = Field(
        ..., description="A list of datasets mentioned in the paper."
    )
    dataset_used: bool = Field(
        ..., description="A boolean indicating if a dataset is used in the paper."
    )
DATA_USE_TASK_PROMPT = """You are an expert in extracting and categorizing dataset mentions from research papers and policy documents. Your task is to **identify and extract all valid dataset mentions**, ensuring they are correctly classified based on naming specificity, context, and relevance.

### **What Qualifies as a Dataset?**
A dataset is a structured collection of data used for empirical research, analysis, or policy-making. Examples include:
- **Surveys & Census Data** (e.g., LSMS, DHS, national census records)
- **Indicators & Indexes** (e.g., HDI, GFSI, WDI, ND-GAIN, EPI)
- **Geospatial & Environmental Data** (e.g., OpenStreetMap, Sentinel-2 imagery)
- **Economic & Trade Data** (e.g., UN Comtrade, Balance of Payments Statistics)
- **Health & Public Safety Data** (e.g., epidemiological surveillance, crime reports)
- **Time-Series & Energy Data** (e.g., climate projections, electricity demand records)
- **Transport & Mobility Data** (e.g., road accident statistics, smart city traffic flow)
- **Other emerging dataset types** as identified in the text.

**Important:**  
If the dataset does not fit into the examples above, infer the **most appropriate category** from the context and **create a new `"data_type"` if necessary**.

### **What Should NOT Be Extracted?**
Do **not** extract mentions that do not clearly refer to a dataset, including, but not limited to:
1. **Organizations & Institutions** (e.g., WHO, IMF, UNDP, "World Bank data" unless it explicitly refers to a dataset)
2. **Reports & Policy Documents** (e.g., "Fiscal Monitor by the IMF", "IEA Energy Report"; only extract if the dataset itself is referenced)
3. **Generic Mentions of Data** (e.g., "various sources", "survey results from multiple institutions")
4. **Economic Models & Policy Frameworks** (e.g., "GDP growth projections", "macroeconomic forecasts")
5. **Legislation & Agreements** (e.g., "Paris Agreement", "General Data Protection Regulation")

### **Rules for Extraction**
1. **Extract All Structured Data Mentions**
   - If the dataset is explicitly named (e.g., "Global Fishing Watch"), label it as `"properly_named"`.
   - If the dataset is described but not explicitly named (e.g., "electricity usage data from Albania"), label it as `"descriptive_but_unnamed"`.
   - If the dataset mention is too generic (e.g., "electricity usage data"), label it as `"vague_generic"`.

2. **Ensure `"data_type"` Is Always Assigned**
   - **Use an existing category if applicable.**
   - **If no suitable category exists, create a new `"data_type"` based on context.**

3. **Classify `"context"` Correctly**
   - `"primary"`: The dataset is used for direct analysis in the document.
   - `"supporting"`: The dataset is referenced to validate or compare findings.
   - `"background"`: The dataset is mentioned as general context or prior research.

   **Examples:**
   - `"The LSMS-ISA data is analyzed to assess the impact of agricultural practices on productivity."` → `"primary"`
   - `"Our results align with previous studies that used LSMS-ISA."` → `"supporting"`
   - `"LSMS-ISA is widely recognized as a reliable data source for agricultural research."` → `"background"`

4. **Capture Full Sentence Context**
   - The `"mentioned_in"` field must always include the **full sentence** where the dataset is referenced.
   - If a dataset is mistakenly extracted from an unrelated sentence, correct it.

### **Extraction Schema**
Each extracted dataset should have the following fields:
- `raw_name`: Exact dataset name from the text (**no paraphrasing**).
- `harmonized_name`: If properly named, use directly; if referenced in multiple ways, standardize using the most precise form in the text, otherwise, set this to None.
- `acronym`: Extract if explicitly mentioned.
- `mentioned_in`: **Full sentence** where the dataset appears (**no paraphrasing**).
- `context`: **primary / supporting / background**
- `specificity`: **properly_named / descriptive_but_unnamed / vague_generic**
- `relevance`: **directly_relevant / indirectly_relevant / not_relevant**
- `producer`: **Extract only if explicitly mentioned; otherwise, set to `None`.**
- `data_type`: **Assign based on existing categories, but create new ones if necessary.**

### **Handling New or Unlisted Data Types**
- If a dataset does not fit into existing categories, **infer an appropriate name** for its `"data_type"` based on context.
- Use a **general but informative label** for new data types (e.g., `"Climate Risk Data"`, `"Social Media Analytics"`).

### **Important: Do NOT Skip Unnamed Datasets**
If a dataset is described but lacks a proper name, extract it under `"descriptive_but_unnamed"` or `"vague_generic"`, which ever is appropriate.
If `"producer"` is not mentioned, set it to `None` rather than inferring."""

Creating Batches for processing#

import os

# Directory for batch files & results
BATCH_DIR = "./openai-batchfiles/extraction"  # Directory for batch files
OUTPUT_DIR = "./extraction_outputs"  # Directory for OpenAI API responses
MODEL = "gpt-4o-mini"
# Ensure output folders exist
os.makedirs(BATCH_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
from tqdm.auto import tqdm


# Process Each PDF into Its Own Batch
def process_pdf_into_batch(pdf_fname, prompt, response_format, classifier, tokenizer):
    """Processes a single PDF file into an OpenAI batch file."""

    file_basename = os.path.basename(pdf_fname).replace(".pdf", "")
    batch_fname = os.path.join(BATCH_DIR, f"batch-{file_basename}.jsonl")

    paths = generate_file_paths(file_basename)

    if os.path.exists(batch_fname):
        print(f"Batch file {batch_fname} already exists. Skipping.")
        return batch_fname  # Skip if already processed

    pages = load_doc(pdf_fname, n_pages=1)

    seen_custom_ids = set()
    with open(batch_fname, "a+") as f:
        for page in tqdm(pages, desc=f"Processing {file_basename}"):
            raw_text = page.get("text")
            page_text = clean_extracted_text(raw_text)
            page_num = page.get("pages")[0]
            save_texts(raw_text, page_text, paths, page_num)

            if not page_text or not should_process_page(
                page_text, classifier, tokenizer
            ):
                continue

            custom_id = f"{file_basename}-page-{page_num}"
            if custom_id in seen_custom_ids:
                continue
            seen_custom_ids.add(custom_id)

            batch_entry = dict(
                custom_id=custom_id,
                method="POST",
                url="/v1/chat/completions",
                body=build_payload(page_text, prompt, response_format),
            )

            f.write(json.dumps(batch_entry) + "\n")

    print(f"Created batch file: {batch_fname}")
    return batch_fname

First, you need to create input directory and put your pdf files there

import glob

INPUT_DIRECTORY = "./input/"
os.makedirs(INPUT_DIRECTORY, exist_ok=True)

PDF_FILES = glob.glob(INPUT_DIRECTORY + "/*.pdf")
PDF_FILES
import re

prompt = DATA_USE_TASK_PROMPT
response_format = LabelledResponseFormat
# loop through each PDF file and process it into a batch
for pdf_fname in PDF_FILES:
    _ = process_pdf_into_batch(
        pdf_fname, prompt, response_format, classifier, tokenizer
    )
import os

# Directories
BATCH_DIR = "./openai-batchfiles/extraction"
MERGED_BATCH_DIR = "./batches"  # Where we save the consolidated batch files
MAX_REQUESTS_PER_BATCH = 15  # Number of requests per batch

# Ensure merged batch directory exists
os.makedirs(MERGED_BATCH_DIR, exist_ok=True)


def consolidate_batches(
    batch_dir=BATCH_DIR, batch_size=MAX_REQUESTS_PER_BATCH, process="extraction"
):
    """
    Consolidates JSONL batch files into larger ones while preserving structure.

    Args:
        batch_dir (str): Directory containing batch files.
        batch_size (int): Max requests per merged batch file.

    Returns:
        list: List of saved merged batch file paths.
    """
    batch_files = [
        os.path.join(batch_dir, f)
        for f in os.listdir(batch_dir)
        if f.endswith(".jsonl")
    ]
    batch_files.sort()

    merged_batches = []
    current_batch = []
    batch_count = 1
    if process:
        os.makedirs(os.path.join(MERGED_BATCH_DIR, process), exist_ok=True)
    for batch_file in batch_files:
        try:
            with open(batch_file, "r") as f:
                batch_data = [
                    json.loads(line) for line in f if line.strip()
                ]  # Skip empty lines
        except json.JSONDecodeError as e:
            print(f"Skipping {batch_file} due to JSON decoding error: {e}")
            continue  # Skip corrupt files

        for entry in batch_data:
            current_batch.append(entry)

            # If batch reaches the specified size, save and reset
            if len(current_batch) >= batch_size:
                merged_batch_file = os.path.join(
                    MERGED_BATCH_DIR, f"{process}/merged-batch-{batch_count}.jsonl"
                )
                with open(merged_batch_file, "w") as f_out:
                    f_out.write(
                        "\n".join(json.dumps(item) for item in current_batch) + "\n"
                    )

                merged_batches.append(merged_batch_file)
                print(f"Created merged batch: {merged_batch_file}")

                # Reset for next batch
                current_batch = []
                batch_count += 1

    # Save any remaining batch data
    if current_batch:
        merged_batch_file = os.path.join(
            MERGED_BATCH_DIR, f"{process}/merged-batch-{batch_count}.jsonl"
        )
        with open(merged_batch_file, "w") as f_out:
            f_out.write("\n".join(json.dumps(item) for item in current_batch) + "\n")

        merged_batches.append(merged_batch_file)
        print(f"Created merged batch: {merged_batch_file}")

    return merged_batches
merged_batches = consolidate_batches(
    batch_dir=BATCH_DIR, batch_size=15, process="extraction"
)  # change your batch_size here (50K max for OpenAI)
merged_batches

Submitting JSONL Batches to OpenAI#

After creating and merging the JSONL batches, the next step is to submit these batches to the OpenAI API for processing. The following code demonstrates how to upload and submit the merged batch files to OpenAI:

import time


def submit_batches_to_openai(batch_filenames):
    """
    Uploads and submits merged batch files to OpenAI.

    Args:
        batch_filenames (list): List of merged batch file paths.
    """
    # batch_input_files = []
    openai_batches = []

    for batch_fname in tqdm(batch_filenames, desc="Uploading merged batch files"):
        try:
            with open(batch_fname, "rb") as file:
                uploaded_file_id = client.files.create(file=file, purpose="batch").id

            # Submit batch job
            batch = client.batches.create(
                input_file_id=uploaded_file_id,
                endpoint="/v1/chat/completions",
                completion_window="24h",
                metadata={"description": f"Merged batch processing: {batch_fname}"},
            )

            openai_batches.append(batch)
            print(f"Submitted batch: {batch.id}")

        except Exception as e:
            print(f"Error processing {batch_fname}: {e}")

        time.sleep(1)

    return openai_batches
openai_batches = submit_batches_to_openai(merged_batches)

It will take a while for the batch process to be completed

You can continue checking until the status is ‘completed’.

# Helper function to list all submitted batches, their statuses.
def list_batches():
    """
    Lists all submitted batches along with their statuses.
    """
    try:
        batches = client.batches.list()
        print("All Batch Jobs:")
        for batch in batches:
            print(
                f"Batch ID: {batch.id}, Status: {batch.status}, Created At: {batch.created_at}"
            )
    except Exception as e:
        print(f"Error listing batches: {e}")


# list_batches()
for batch in openai_batches:
    print(f"Batch ID: {batch.id}, Status: {batch.status}")
# once completed we can download the results
# for demo purposes, the following batch_ids are completed and ready for download

batch_ids = [
    "batch_67b4adcc7a248190b5c339d0fba3a727",
    "batch_67b4adc9d4c88190ab4eabdf6578552e",
]


def retrieve_batch_results(batch_ids, process="extraction"):
    if process:
        os.makedirs(f"./batches/{process}", exist_ok=True)
    for batch_id in batch_ids:
        batch_details = client.batches.retrieve(batch_id)
        if batch_details.status == "completed":
            result = client.files.content(batch_details.output_file_id).content
            results_file = f"./batches/{process}/results-{batch_id}.jsonl"
            with open(results_file, "wb") as file:
                file.write(result)
            print(f"saved results to {results_file}")
    return None


_ = retrieve_batch_results(batch_ids, process="extraction")
def load_and_save_outputs(batch_files):
    payload = []
    for batch_file in batch_files:
        # Loading data from saved file

        with open(batch_file, "r") as file:
            for line in file:
                # Parsing the JSON string into a dict and appending to the list of results
                json_object = json.loads(line.strip())
                payload.append(json_object)

    return payload
import glob

batch_files = glob.glob("./batches/extraction/results-*.jsonl")
payload = load_and_save_outputs(batch_files)
extraction_path = "./extraction_outputs/extraction"
raw_text_path = "./output/raw_text"
os.makedirs(extraction_path, exist_ok=True)
os.makedirs(raw_text_path, exist_ok=True)


def map_and_save_output(payload, extraction_path, raw_text_path):
    for res in payload:
        fname_origin = res["custom_id"].split("-page-")[0]
        page = int(res["custom_id"].split("-page-")[-1])
        page_str = str(page + 1)

        content = json.loads(
            res["response"]["body"]["choices"][0]["message"]["content"]
        )

        extraction_resfname = extraction_path + f"/{fname_origin}.json"
        if content.get("dataset") != []:
            with open(
                raw_text_path + f"/{fname_origin}.json", "r", encoding="utf-8"
            ) as raw_txt:
                raw_text = json.load(raw_txt).get("pages").get(page_str)
            save_output_per_document(raw_text, content, extraction_resfname, page)
    return None
_ = map_and_save_output(payload, extraction_path, raw_text_path)
# you can use the following function to inspect the json file
def inspect_json(json_file):
    try:
        with open(json_file, "r") as f:
            results = [json.loads(line) for line in f]
        return results
    except Exception:
        with open(json_file, "r", encoding="utf-8") as fn:
            return json.load(fn)

Inspect and check the output

json_file = (
    "./extraction_outputs/extraction/06c998e896785ab8b6d6caa4a8beb2f505c375a5.json"
)
inspect_json(json_file)

LLM-as-a-Judge for Quality Assessment#

After extracting dataset mentions, we validate the output using an LLM-as-a-judge pipeline. This involves:

  1. Validation Criteria: Assess dataset mentions as valid, invalid, or needing clarification.

  2. Consistency Check: Ensure consistent validity unless context changes.

  3. Context-Aware Inference: Extract missing details from the mentioned_in field.

  4. Data Type Classification: Infer appropriate data types from context.

  5. Producer Identification: Extract explicitly mentioned producers; otherwise, set to None.

Validation Criteria A dataset is valid if:

  • Structured and systematically collected.

  • Reproducible, consisting of collected records.

Always Valid Datasets:

  • Government statistical and geospatial datasets.

  • Official surveys, administrative records, economic transaction data, and scientific research datasets.

Invalid Datasets:

  • Derived indicators or computational constructs.

  • Standalone statistical metrics without clear underlying data.

  • General organizations, reports, or methodologies.

Uncertain Cases:

  • Vaguely named but potentially valid: "Potentially valid—needs dataset name confirmation."

  • Too generic: "Needs clarification—dataset name is too generic."

Key Validation Rules

  1. Consistency Check: Maintain validity unless context changes.

  2. Context-Aware Inference: Extract missing details from the mentioned_in field.

  3. Data Type Classification: Infer appropriate data types from context.

  4. Producer Identification: Extract explicitly mentioned producers; otherwise, set to None.

Each dataset assessment must conform to the JudgeResponseFormat schema.

# Create a pydantic model for the judge response
from pydantic import model_validator


class JudgeDatasetEntry(BaseModel):
    raw_name: Optional[str] = Field(
        ..., description="The exact dataset name as it appears in the text."
    )
    harmonized_name: Optional[str] = Field(
        None, description="The standardized or full name of the dataset."
    )
    acronym: Optional[str] = Field(
        None, description="The short name or acronym associated with the dataset."
    )
    context: Context
    specificity: Specificity
    relevance: Relevance
    producer: Optional[str] = Field(
        None, description="The organization responsible for producing the dataset."
    )
    data_type: Optional[str] = Field(
        None, description="The type of data represented by the dataset."
    )
    year: Optional[str] = Field(
        None,
        description="The year associated with the dataset, if explicitly mentioned.",
    )
    valid: bool = Field(
        ..., description="True if the mention is valid, false otherwise."
    )
    invalid_reason: Optional[str] = Field(
        None, description="Reason why the mention was invalid (if applicable)."
    )
    sent: Optional[str] = Field(
        None, description="The exact sentence where the dataset is mentioned."
    )
    # entities: Optional[EmpiricalMention] = Field(None, description="Additional empirical context for the dataset.")

    # Validator to ensure valid and invalid_reason consistency
    @model_validator(mode="after")
    def check_validity(cls, instance):
        if not instance.valid and not instance.invalid_reason:
            raise ValueError("If 'valid' is False, 'invalid_reason' must be provided.")
        return instance


class JudgeDatasetGroup(BaseModel):
    mentioned_in: Optional[str] = Field(
        None, description="The exact text excerpt where the dataset is mentioned."
    )
    datasets: List[JudgeDatasetEntry] = Field(
        ..., description="A list of validated datasets mentioned in the paper."
    )


class JudgeResponseFormat(BaseModel):
    page_number: int = Field(..., description="The page number in the document.")
    dataset_used: bool = Field(
        ...,
        description="Flag indicating whether a valid dataset is mentioned in the page.",
    )
    data_mentions: List[JudgeDatasetGroup] = Field(
        ...,
        description="A list of structured dataset information mentioned in the paper.",
    )
# judge prompt
JUDGE_PROMPT = """You are an expert in dataset validation. Your task is to assess whether each dataset mention is **valid, invalid, or requires clarification**, ensuring correctness and consistency based on the dataset's **empirical context**.

---

### **Dataset Validation Criteria**
A dataset is **valid** if:
1. **It is structured**—collected systematically for research, policy, or administrative purposes.
2. **It is reproducible**—meaning it consists of collected records rather than being derived purely from computations or models.

**Always Valid Datasets:**
- Government statistical and geospatial datasets (e.g., census, official land records).  
- Official surveys, administrative records, economic transaction data, and scientific research datasets.  

**Invalid Datasets:**
Set as invalid all `"raw_name"` that belong under the following classes.
- Derived indicators or computational constructs (e.g., "wealth score", "mine dummy", "district total production").  
- Standalone statistical metrics without a clear underlying dataset (e.g., "average income growth rate" without source data).  
- General organizations, reports, or methodologies (e.g., "World Bank", "UNDP Report", "machine learning model").  

**Uncertain Cases:**
- If a dataset is **vaguely named but potentially valid**, set it as valid but return: `"Potentially valid—needs dataset name confirmation."`  
- If a dataset reference is **too generic** (e.g., `"time-varying data on production"`), set it as valid but return: `"Needs clarification—dataset name is too generic."`  

---

### **Key Validation Rules**
1. **Consistency Check:**  
   - If a `"raw_name"` has been marked **valid earlier**, it **must remain valid** unless its meaning significantly differs in a new context.

2. **Context-Aware Inference:**  
   - If certain details are missing such as the **Year**, **Producer**, or **Data Type**, try to extract them from the `mentioned_in` field if available and correctly relate to the data.

3. **Data Type Classification (Flexible & Adaptive):**  
   - Infer the most appropriate `"data_type"` dynamically from context.  
   - Possible types: **Surveys, geospatial data, administrative records, financial reports, research datasets, climate observations, etc.**  
   - If **no predefined category fits**, create a **new `"data_type"` that best describes the dataset.**  

4. **Producer Identification:**  
   - If the **producer (organization/institution) is explicitly mentioned**, extract it.  
   - If not mentioned, **do not infer—set `"producer": None"` instead.**  

---

### **JudgeResponseFormat Schema**
Each dataset assessment must conform strictly to the JudgeResponseFormat schema."""
import os

# Directory for batch files & results
JUDGE_BATCH_DIR = "./openai-batchfiles/judge-batches"  # Directory for batch files
JUDGE_OUTPUT_DIR = "./extraction_outputs/judge"  # Directory for OpenAI API responses
MODEL = "gpt-4o-mini"
# Ensure output folders exist
os.makedirs(JUDGE_BATCH_DIR, exist_ok=True)
os.makedirs(JUDGE_OUTPUT_DIR, exist_ok=True)
def extracted_outputs_to_batch(extraction_fname, prompt, response_format):
    """Processes a single extraction file into a judge batch file."""

    file_basename = os.path.basename(extraction_fname).replace(".json", "")
    batch_fname = os.path.join(JUDGE_BATCH_DIR, f"judge-batch-{file_basename}.jsonl")

    if os.path.exists(batch_fname):
        print(f"Batch file {batch_fname} already exists. Skipping.")
        return batch_fname  # Skip if already processed

    with open(extraction_fname, "r") as f:
        extraction_data = json.load(f)

    for page_data in extraction_data["pages"]:
        page_num = page_data.get("page")
        data = page_data.get("data_mentions")

        if not data:
            continue

        custom_id = f"{file_basename}-page-{page_num}"
        batch_entry = dict(
            custom_id=custom_id,
            method="POST",
            url="/v1/chat/completions",
            body=build_payload(json.dumps(data), prompt, response_format),
        )

        with open(batch_fname, "a+") as f:
            f.write(json.dumps(batch_entry) + "\n")

    print(f"Created batch file: {batch_fname}")
    return batch_fname
EXTRACTION_DIRECTORY = "./extraction_outputs"
EXTRACTION_FILES = glob.glob(EXTRACTION_DIRECTORY + "/extraction/*.json")
judge_prompt = JUDGE_PROMPT
judge_response_format = JudgeResponseFormat
EXTRACTION_FILES
# loop through each PDF file and process it into a batch
for json_fname in EXTRACTION_FILES:
    _ = extracted_outputs_to_batch(json_fname, judge_prompt, judge_response_format)
merged_judge_batches = consolidate_batches(
    batch_dir=JUDGE_BATCH_DIR, batch_size=10, process="judge"
)
merged_judge_batches
openai_judge_batches = submit_batches_to_openai(merged_judge_batches)  # submit
for judge_batch in openai_judge_batches:
    print(judge_batch.id, client.batches.retrieve(judge_batch.id).status)
# for demo purposes, the following batch_ids are completed and ready for download
judge_batch_ids = [
    "batch_67b5eccdd0788190b90d2eda5a0eb580",
    "batch_67b5eccbca848190bf0d16ae8a8fa7ea",
]
_ = retrieve_batch_results(judge_batch_ids, process="judge")
batch_files = glob.glob("./batches/judge/results-*.jsonl")
judge_payload = load_and_save_outputs(batch_files)
judge_payload[0]
import os


def map_judge_output(judge_payload, judge_path):
    os.makedirs(judge_path, exist_ok=True)

    for res in judge_payload:
        fname_origin = res["custom_id"].split("-page-")[0]
        page_number = int(res["custom_id"].split("-page-")[1])
        content = json.loads(
            res["response"]["body"]["choices"][0]["message"]["content"]
        )

        extraction_resfname = os.path.join(judge_path, f"{fname_origin}.json")

        if content.get("data_mentions"):
            # Construct the page entry
            page_entry = {
                "page": page_number,
                "dataset_used": bool(content.get("data_mentions")),
                "data_mentions": content["data_mentions"],
            }

            # Read existing data
            if os.path.exists(extraction_resfname):
                with open(extraction_resfname, "r", encoding="utf-8") as f:
                    try:
                        existing_data = json.load(f)
                    except json.JSONDecodeError:
                        existing_data = {
                            "source": fname_origin,
                            "pages": [],
                        }  # Handle empty or corrupted file
            else:
                existing_data = {"source": fname_origin, "pages": []}

            # Ensure the existing structure is correct
            if "pages" not in existing_data:
                existing_data["pages"] = []

            # Append new page entry
            existing_data["pages"].append(page_entry)

            # Write back to file
            with open(extraction_resfname, "w", encoding="utf-8") as f:
                json.dump(existing_data, f, indent=4, ensure_ascii=False)

    return None
judge_path = "./extraction_outputs/judge"
_ = map_judge_output(judge_payload, judge_path)

Autonomous Reasoning Agent#

Once the information is validated by the LLM, we will use the autonomous reasoning agent to further refine and validate the extracted data. The reasoning agent will follow a structured prompt to ensure the accuracy and relevance of the dataset mentions.

THINKING_PROMPT = """Your task is to review a structured user input that may mention a dataset in a text. Please take your time.

Carefully analyze what the text in the `mentioned_in` field explicitly means and in what context the `raw_name` is discussed. Never infer, imply, or assume, so you must exclusively rely on the text as facts. If there are multiple datasets, do the assessment individually.

Plan a strategy to ensure you can maximize the chances of correctly judging and classifying whether the provided input:
- Clearly, the `raw_name` falls under the concept of a data/dataset and not by extension or implicitly.
- Whether the raw_name is actually in the `mentioned_in`.
- Whether the harmonized_name (if present) is actually in the `mentioned_in`. If not found, remove it from the output.
- The `raw_name` is `properly_named` (e.g., DHS, LSMS, etc.), `descriptive_but_unnamed` (administrative school records in Ghana for 2020) , or `vague_generic` (a survey data). Any of these are valid data mentions. To be sure, elaborate how you interpret these classes and use that for classifying.
- The context concerning usage of the dataset is mentioned: is it `primary`, `supporting`, or `background`.

Then, write down your strategy.

After you write down your strategy, synthesize it to develop a rubric of what qualifies as a dataset, which you must use to base your judgment.

Incorporate a devil's advocate review as part of your strategy. If the review shows inconsistency, update accordingly. Do not reason based on assumption, inference, or implicit thinking.  Relationships do not count as a dataset; for example, the producer is not a dataset.

Execute the strategy, **step by step**, and write an analysis of how you interpret the `raw_name` in the context of the `mentioned_in`.

If your analysis results in the `raw_name` being a dataset, set the `valid` field to `true`, otherwise, set it to `false`. In both cases, return the result of your analysis focusing on the `raw_name` in the `reason` field. If it is invalid, set the `specificity` and `context` to null.

ALWAYS WRITE A DEVIL'S ADVOCATE REVIEW AFTER THE ANALYSIS BEFORE CONCLUDING.

After you write your analysis, your output must repeat the input with the `specificity`, `context`, `valid` and `invalid_reason` values replaced accordingly in the same level as the corresponding `raw_name`. IMPORTANT: the final output must be between these tags <OUTPUTDATA>```json<the output must be here>```</OUTPUTDATA>"""
# we are only interested in valid mentions of data from the judge llm
def filter_valid_mentions(validated_input):
    # Filter out invalid datasets before passing to LLM
    filtered_mentions = []
    for page in validated_input["pages"]:
        for mention in page.get("data_mentions", []):
            valid_datasets = [
                dataset
                for dataset in mention["datasets"]
                if dataset.get("valid", False)
            ]

            if valid_datasets:  # Only keep mentions with at least one valid dataset
                filtered_mentions.append(
                    {
                        "mentioned_in": mention["mentioned_in"],
                        "datasets": valid_datasets,
                        "page": page["page"],
                        "dataset_used": page["dataset_used"],
                    }
                )

    input_data = {
        "source": validated_input.get("source"),
        "data_mentions": filtered_mentions,
    }

    return input_data
def prepare_input_data(data):
    for mention in data.get("data_mentions", []):
        for ds in mention.get("datasets", []):
            # Replace string "None" with actual None
            if ds.get("producer") == "None":
                ds["producer"] = None

            # Remove unwanted keys
            keys_to_remove = [
                "sent",
                "specificity",
                "context",
                "relevance",
                "data_type",
                "valid",
                "invalid_reason",
            ]
            for key in keys_to_remove:
                ds.pop(key, None)  # `None` as default to avoid KeyError
    return data
import os

# Directory for batch files & results
REASONING_BATCH_DIR = (
    "./openai-batchfiles/reasoning-batches"  # Directory for batch files
)
REASONING_OUTPUT_DIR = (
    "./extraction_outputs/reasoning"  # Directory for OpenAI API responses
)
MODEL = "gpt-4o-mini"
# Ensure output folders exist
os.makedirs(REASONING_BATCH_DIR, exist_ok=True)
os.makedirs(REASONING_OUTPUT_DIR, exist_ok=True)
from hashlib import md5


def judge_outputs_to_batch(judge_fname, prompt):
    """Processes a single judge file into a reasoning batch file."""

    file_basename = os.path.basename(judge_fname).replace(".json", "")
    batch_fname = os.path.join(
        REASONING_BATCH_DIR, f"reasoning-batch-{file_basename}.jsonl"
    )

    if os.path.exists(batch_fname):
        print(f"Batch file {batch_fname} already exists. Skipping.")
        return batch_fname  # Skip if already processed

    with open(judge_fname, "r") as f:
        judge_data = json.load(f)

    filtered_data = filter_valid_mentions(judge_data)
    prepared_data = prepare_input_data(filtered_data)

    for page_data in prepared_data["data_mentions"]:
        mention_hash = page_data.get("mentioned_in")
        if not mention_hash:
            continue
        mhash = md5(mention_hash.encode()).hexdigest()[:8]
        custom_id = f"{file_basename}_{mhash}-pg-{page_data['page']}"
        batch_entry = dict(
            custom_id=custom_id,
            method="POST",
            url="/v1/chat/completions",
            body=build_payload(json.dumps(page_data), prompt),
        )

        with open(batch_fname, "a+") as f:
            f.write(json.dumps(batch_entry) + "\n")

    print(f"Created batch file: {batch_fname}")
    return batch_fname
EXTRACTION_DIRECTORY = "./extraction_outputs"
JUDGE_FILES = glob.glob(EXTRACTION_DIRECTORY + "/judge/*.json")
reasoning_prompt = THINKING_PROMPT
# loop through each PDF file and process it into a batch
for json_fname in JUDGE_FILES:
    _ = judge_outputs_to_batch(json_fname, reasoning_prompt)
openai_reasoning_batches = consolidate_batches(
    batch_dir=REASONING_BATCH_DIR, batch_size=15, process="reasoning"
)
openai_reasoning_batches = submit_batches_to_openai(openai_reasoning_batches)
for reasoning_batch in openai_reasoning_batches:
    print(client.batches.retrieve(reasoning_batch.id).status, reasoning_batch.id)
# openai_reasoning_batches = [batch.id for batch in openai_reasoning_batches]
# for demo purposes, the following batch_ids are completed and ready for download
reasoning_batch_ids = [
    "batch_67b6c639468c8190b87726743431e76f",
    "batch_67b6c63b48848190be8cfc7fa018300c",
]
_ = retrieve_batch_results(reasoning_batch_ids, process="reasoning")
batch_files_reasoning = glob.glob("./batches/reasoning/results-*.jsonl")
reasoning_payload = load_and_save_outputs(batch_files_reasoning)
#!pip install beautifulsoup4
from bs4 import BeautifulSoup


def map_reasoning_output(reasoning_payload, reasoning_path):
    os.makedirs(reasoning_path, exist_ok=True)

    for res in reasoning_payload:
        fname_origin = res["custom_id"].split("_")[0]
        content = res["response"]["body"]["choices"][0]["message"]["content"]

        reasoning_fname = os.path.join(reasoning_path, f"{fname_origin}.json")

        content = content[content.index("```json") + len("```json") :]
        content = content[: content.index("```")]

        if "<outputdata>" in content.lower() and "</outputdata>" in content.lower():
            soup = BeautifulSoup(content, "html.parser")
            content = soup.find("outputdata").text

        content = json.loads(content)
        # Construct the page entry
        filtered_data = []
        filtered_data.append(
            {
                "mentioned_in": content["mentioned_in"],
                "page": content["page"],
                "dataset_used": content["dataset_used"],
                "datasets": [
                    dataset
                    for dataset in content.get("datasets")
                    if dataset.get("valid", False)
                ],
            }
        )
        page_entry = {
            "data_mentions": filtered_data  # we keep only valid: true entries
        }

        if os.path.exists(reasoning_fname):
            with open(reasoning_fname, "r", encoding="utf-8") as f:
                try:
                    existing_data = json.load(f)
                except json.JSONDecodeError:
                    existing_data = {
                        "source": fname_origin,
                        "pages": [],
                    }  # Handle empty or corrupted file
        else:
            existing_data = {"source": fname_origin, "pages": []}

        # Ensure the existing structure is correct
        if "pages" not in existing_data:
            existing_data["pages"] = []

        # Append new page entry
        existing_data["pages"].append(page_entry)

        # Write back to file
        with open(reasoning_fname, "w", encoding="utf-8") as f:
            json.dump(existing_data, f, indent=4, ensure_ascii=False)

    return None
reasoning_path = "./extraction_outputs/reasoning"
_ = map_reasoning_output(reasoning_payload, reasoning_path)
inspect_json(
    "./extraction_outputs/reasoning/The-local-socioeconomic-effects-of-gold-mining-evidence-from-Ghana.json"
)

That’s it! Now you can make your fine-tuning dataset. This notebook has walked you through a comprehensive batch processing implementation for data labeling of climate change-related documents. By leveraging various tools and libraries, we’ve automated and streamlined the data labeling process.

For making your fine-tuning dataset, you can check this notebook to learn more.