Data Provider Scripts¶
Overview¶
This script defines a typed data provider that reads signal data from local
Parquet files and exposes it through a consistent API used by dFL
.
The get_provider() function returns a dictionary-like structure
(DataCoordinatorInfo) that bundles both the dataset's metadata and callable
functions for data retrieval.
The key method, fetch_data(), handles time-filtered reads using pandas’
read_parquet() interface and returns results in a standardized format.
Each record in the output includes the raw signal array, timestamps, and
lightweight metadata. Signals that fail to load are listed under
errored_signals, ensuring downstream applications can handle missing data
gracefully.
Together, fetch_record_ids_for_dataset_id() and fetch_data() make it easy to
iterate over multiple file-based datasets without hard-coding paths or schema
assumptions.
This modular, type-annotated structure allows the provider to be easily integrated into labeling GUIs or analytic pipelines, while remaining extensible for other back-ends such as remote APIs, experimental archives, or simulation outputs.
Below is a simple data provider for vanilla ingestion of weather data. Details of the weather dataset can be found in DEMOS. For complicated and customized data providers see Utility Scripts and/or how to automate the generation of data providers from LLM Prompts.
Example Implementation¶
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, TypedDict
import numpy as np
import pandas as pd
from numpy.typing import NDArray
# ---- Typed return payloads ---------------------------------------------------
class SignalRecord(TypedDict):
data: NDArray[Any] # e.g., float, int; stays generic
data_name: str
times: NDArray[Any] # typically datetime64[ns, UTC]
units: Dict[str, str]
dims: Tuple[str, ...]
errored: bool
class FetchDataResult(TypedDict):
id: str
signals: List[SignalRecord]
errored_signals: List[str]
class DataCoordinatorInfo(TypedDict, total=False):
# Core data access
fetch_data: Any # callable; leaving as Any to avoid recursive types
dataset_id: str
fetch_record_ids_for_dataset_id: Any
# Schema / metadata
all_possible_signals: List[str]
is_date: bool
data_folder: Path
# Processing / analysis options
custom_smoothing_options: Optional[Any]
custom_normalizing_options: Optional[Any]
trim_data: Optional[Any]
# Labeling / classification
auto_label_function_dictionary: Optional[Dict[str, Any]]
all_labels: List[str]
# Visualization / UI
custom_grapher_dictionary: Optional[Dict[str, Any]]
layout_options: Optional[Any]
default_trim_1: Optional[str]
default_trim_2: Optional[str]
def get_provider(_: Any) -> DataCoordinatorInfo:
def fetch_record_ids_for_dataset_id(data_folder: Path, _unused: Optional[Any] = None) -> List[str]:
return [file.stem for file in data_folder.glob("*.parquet")]
def fetch_data(
data_folder: Path,
dataset_id: str,
record_id: Optional[str],
signals: Optional[List[str]],
global_data_params: Optional[Dict[str, Any]],
data_trim_1: Optional[str] = None,
data_trim_2: Optional[str] = None,
) -> FetchDataResult | List[Any]:
if record_id is None:
return [] # preserves your original sentinel behavior
# default signals: first available if none provided
if signals is None:
signals = [all_possible_signals[0]] if all_possible_signals else []
path: Path = Path(data_folder) / f"{record_id}.parquet"
# Build UTC filters (pyarrow-style)
filters: List[Tuple[str, str, pd.Timestamp]] = []
if data_trim_1 is not None:
filters.append(("time", ">=", pd.to_datetime(data_trim_1, utc=True)))
if data_trim_2 is not None:
filters.append(("time", "<=", pd.to_datetime(data_trim_2, utc=True)))
# Read data with filters
try:
df: pd.DataFrame = pd.read_parquet(path, filters=filters if filters else None)
except Exception as e:
print(f"Error reading parquet with filters: {e}")
return {"id": record_id, "signals": [], "errored_signals": signals}
times: NDArray[Any] = df.index.to_numpy()
data_array: List[SignalRecord] = []
errored_signals: List[str] = []
for signal in signals:
if signal in df.columns:
signal_data: NDArray[Any] = df[signal].to_numpy()
record: SignalRecord = {
"data": signal_data,
"data_name": signal,
"times": times,
"units": {},
"dims": ("times",),
"errored": False,
}
data_array.append(record)
else:
errored_signals.append(signal)
return {"id": record_id, "signals": data_array, "errored_signals": errored_signals}
# --- Module Initialization ---
data_folder: Path = Path(__file__).parent / "weather_data"
existing_record_ids: List[str] = fetch_record_ids_for_dataset_id(data_folder)
if not existing_record_ids:
raise IndexError(f"No parquet files found in {data_folder}, existing_record_ids is empty.")
# Extract available signals from the first data file
first_record: str = existing_record_ids[0]
file_path: Path = Path(data_folder) / f"{first_record}.parquet"
first_df: pd.DataFrame = pd.read_parquet(file_path)
all_possible_signals: List[str] = first_df.columns.to_list()
all_possible_signals.append("example_nonexistent_data")
data_coordinator_info: DataCoordinatorInfo = {
# Core data access functions
"fetch_data": fetch_data,
"dataset_id": "weather_data_minimal_typed",
"fetch_record_ids_for_dataset_id": fetch_record_ids_for_dataset_id,
# Data schema and metadata
"all_possible_signals": all_possible_signals,
"is_date": True, # Indicates datetime-indexed data
"data_folder": data_folder,
# Processing and analysis options
"custom_smoothing_options": None,
"custom_normalizing_options": None,
"trim_data": None,
# Labeling and classification
"auto_label_function_dictionary": None,
"all_labels": ["Peak", "Dip", "Anomaly"],
# Visualization and UI
"custom_grapher_dictionary": None,
"layout_options": None,
"default_trim_1": None,
"default_trim_2": None,
}
return data_coordinator_info