Skip to content

Data Provider Scripts

Overview

This script defines a typed data provider that reads signal data from local Parquet files and exposes it through a consistent API used by dFL .
The get_provider() function returns a dictionary-like structure (DataCoordinatorInfo) that bundles both the dataset's metadata and callable functions for data retrieval.

The key method, fetch_data(), handles time-filtered reads using pandas’ read_parquet() interface and returns results in a standardized format.
Each record in the output includes the raw signal array, timestamps, and lightweight metadata. Signals that fail to load are listed under errored_signals, ensuring downstream applications can handle missing data gracefully.

Together, fetch_record_ids_for_dataset_id() and fetch_data() make it easy to iterate over multiple file-based datasets without hard-coding paths or schema assumptions.

This modular, type-annotated structure allows the provider to be easily integrated into labeling GUIs or analytic pipelines, while remaining extensible for other back-ends such as remote APIs, experimental archives, or simulation outputs.

Below is a simple data provider for vanilla ingestion of weather data. Details of the weather dataset can be found in DEMOS. For complicated and customized data providers see Utility Scripts and/or how to automate the generation of data providers from LLM Prompts.

Example Implementation

from __future__ import annotations

from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, TypedDict

import numpy as np
import pandas as pd
from numpy.typing import NDArray


# ---- Typed return payloads ---------------------------------------------------

class SignalRecord(TypedDict):
    data: NDArray[Any]                # e.g., float, int; stays generic
    data_name: str
    times: NDArray[Any]               # typically datetime64[ns, UTC]
    units: Dict[str, str]
    dims: Tuple[str, ...]
    errored: bool

class FetchDataResult(TypedDict):
    id: str
    signals: List[SignalRecord]
    errored_signals: List[str]

class DataCoordinatorInfo(TypedDict, total=False):
    # Core data access
    fetch_data: Any                   # callable; leaving as Any to avoid recursive types
    dataset_id: str
    fetch_record_ids_for_dataset_id: Any

    # Schema / metadata
    all_possible_signals: List[str]
    is_date: bool
    data_folder: Path

    # Processing / analysis options
    custom_smoothing_options: Optional[Any]
    custom_normalizing_options: Optional[Any]
    trim_data: Optional[Any]

    # Labeling / classification
    auto_label_function_dictionary: Optional[Dict[str, Any]]
    all_labels: List[str]

    # Visualization / UI
    custom_grapher_dictionary: Optional[Dict[str, Any]]
    layout_options: Optional[Any]
    default_trim_1: Optional[str]
    default_trim_2: Optional[str]


def get_provider(_: Any) -> DataCoordinatorInfo:
    def fetch_record_ids_for_dataset_id(data_folder: Path, _unused: Optional[Any] = None) -> List[str]:
        return [file.stem for file in data_folder.glob("*.parquet")]

    def fetch_data(
        data_folder: Path,
        dataset_id: str,
        record_id: Optional[str],
        signals: Optional[List[str]],
        global_data_params: Optional[Dict[str, Any]],
        data_trim_1: Optional[str] = None,
        data_trim_2: Optional[str] = None,
    ) -> FetchDataResult | List[Any]:
        if record_id is None:
            return []  # preserves your original sentinel behavior

        # default signals: first available if none provided
        if signals is None:
            signals = [all_possible_signals[0]] if all_possible_signals else []

        path: Path = Path(data_folder) / f"{record_id}.parquet"

        # Build UTC filters (pyarrow-style)
        filters: List[Tuple[str, str, pd.Timestamp]] = []
        if data_trim_1 is not None:
            filters.append(("time", ">=", pd.to_datetime(data_trim_1, utc=True)))
        if data_trim_2 is not None:
            filters.append(("time", "<=", pd.to_datetime(data_trim_2, utc=True)))

        # Read data with filters
        try:
            df: pd.DataFrame = pd.read_parquet(path, filters=filters if filters else None)
        except Exception as e:
            print(f"Error reading parquet with filters: {e}")
            return {"id": record_id, "signals": [], "errored_signals": signals}

        times: NDArray[Any] = df.index.to_numpy()
        data_array: List[SignalRecord] = []
        errored_signals: List[str] = []

        for signal in signals:
            if signal in df.columns:
                signal_data: NDArray[Any] = df[signal].to_numpy()
                record: SignalRecord = {
                    "data": signal_data,
                    "data_name": signal,
                    "times": times,
                    "units": {},
                    "dims": ("times",),
                    "errored": False,
                }
                data_array.append(record)
            else:
                errored_signals.append(signal)

        return {"id": record_id, "signals": data_array, "errored_signals": errored_signals}

    # --- Module Initialization ---
    data_folder: Path = Path(__file__).parent / "weather_data"
    existing_record_ids: List[str] = fetch_record_ids_for_dataset_id(data_folder)

    if not existing_record_ids:
        raise IndexError(f"No parquet files found in {data_folder}, existing_record_ids is empty.")

    # Extract available signals from the first data file
    first_record: str = existing_record_ids[0]
    file_path: Path = Path(data_folder) / f"{first_record}.parquet"
    first_df: pd.DataFrame = pd.read_parquet(file_path)
    all_possible_signals: List[str] = first_df.columns.to_list()
    all_possible_signals.append("example_nonexistent_data")

    data_coordinator_info: DataCoordinatorInfo = {
        # Core data access functions
        "fetch_data": fetch_data,
        "dataset_id": "weather_data_minimal_typed",
        "fetch_record_ids_for_dataset_id": fetch_record_ids_for_dataset_id,

        # Data schema and metadata
        "all_possible_signals": all_possible_signals,
        "is_date": True,  # Indicates datetime-indexed data
        "data_folder": data_folder,

        # Processing and analysis options
        "custom_smoothing_options": None,
        "custom_normalizing_options": None,
        "trim_data": None,

        # Labeling and classification
        "auto_label_function_dictionary": None,
        "all_labels": ["Peak", "Dip", "Anomaly"],

        # Visualization and UI
        "custom_grapher_dictionary": None,
        "layout_options": None,
        "default_trim_1": None,
        "default_trim_2": None,
    }
    return data_coordinator_info