Skip to content

Integrating Custom Autolabelers into dFL

dFL offers a flexible framework for detecting and labeling temporal events in time-series data. Its modular design allows researchers to register custom autolabeling functions with a central coordinator that manages data access, parameter handling, and result aggregation.

Custom autolabelers can run in either single-shot or bulk processing modes, enabling domain-specific algorithms to be applied across one or many datasets at once.


Core Workflow

The autolabeling workflow is organized around three key components:

  1. Data Coordinator – provides a unified interface to datasets and abstracts away data access patterns.
  2. Registry of Functions – holds domain-specific autolabelers that implement event detection logic.
  3. Callback System – links results to the dFL web interface, supporting user interaction in real time.

Each autolabeler follows a standardized Python signature that accepts:

  • Dataset identifiers
  • Record (shot) IDs
  • A data coordinator reference
  • Additional user-defined parameters

dFL automatically validates inputs, fetches data, and formats results. This way, researchers can focus on writing detection logic without worrying about infrastructure details.


Integrating Custom Autolabelers

Custom autolabelers can be as simple as signal processing routines (e.g. threshold crossing, peak finding) or as advanced as machine learning classifiers.
- A threshold autolabeler might mark an event whenever signal values exceed a user-defined limit.
- A peak detection autolabeler can identify local maxima in a temperature signal using scipy.signal.find_peaks.

Below is an example of defining both a threshold and a peak detector autolabeler. These are first assembled in the data coordinator configuration (data_coordinator_info).

=== "Python"

from auto_labeling import autolabeling_dictionary

# --- Final Provider Assembly ---
data_coordinator_info = {
    "fetch_data": fetch_data,
    "dataset_id": "weather_data_2",
    "fetch_shot_ids_for_dataset_id": fetch_shot_ids_for_dataset_id,
    "all_possible_signals": all_possible_signals,
    "custom_smoothing_options": custom_smoothing_dictionary,
    "custom_normalizing_options": custom_normalizing_dictionary,
    "spline_path": "spline_parameters.csv",
    "auto_label_function_dictionary": autolabeling_dictionary,
    "all_labels": ["Peak", "Dip", "Anomaly"],
    "custom_grapher_dictionary": custom_grapher_dictionary,
    "is_date": True,
    "trim_data": trim_data,
    "data_folder": data_folder,
    "layout_options": layout_options
}

Then registered into the autolabeling function dictionary, and finally made available for use in the web callback system.

=== "Python"

import pandas as pd
from scipy.signal import find_peaks

def perform_threshold_autolabeling(
    dataset_id, shot_id, data_coordinator, additional_parameters, trim_1=None, trim_2=None
):
    print(f"autolabeling {shot_id}")
    if shot_id is None:
        return []
    signals_to_autolabel = additional_parameters["threshold_signals"]
    threshold = additional_parameters["threshold_autolabel_threshold"]
    shot_data = data_coordinator.fetch_data_async(
        data_coordinator.data_folder, dataset_id, shot_id, signals_to_autolabel, {}, trim_1=trim_1, trim_2=trim_2
    )
    if len(shot_data["signals"]) == 0:
        return []
    signal = shot_data["signals"][0]
    signal_data = signal["data"]
    times = signal["times"]

    labels = []
    start = None
    finish = None

    for value_index, value in enumerate(signal_data):
        if value > threshold and start is None:
            start = times[value_index]
        if value < threshold and start is not None:
            finish = times[value_index]
        if start is not None and finish is not None:
            label = {}
            for possible_label in data_coordinator.all_labels:
                label[possible_label] = None
            label["Anomaly"] = True
            label["T1"] = pd.Timestamp(start) - pd.Timedelta(seconds=10)
            label["T2"] = pd.Timestamp(finish) + pd.Timedelta(seconds=10)
            labels.append(label)
            start = None
            finish = None
    return labels


def perform_temperature_peaks_autolabeling(
    dataset_id, shot_id, data_coordinator, additional_parameters, trim_1=None, trim_2=None
):
    print(f"autolabeling {shot_id}")
    if shot_id is None:
        return []
    shot_data = data_coordinator.fetch_data_async(
        data_coordinator.data_folder, dataset_id, shot_id, ["Temperature"], {}, trim_1=trim_1, trim_2=trim_2
    )
    if len(shot_data["signals"]) == 0:
        return []
    signal = shot_data["signals"][0]
    signal_data = signal["data"]
    times = signal["times"]
    times = pd.to_datetime(times)
    formatted_dates = [dt.strftime("%Y-%m-%d") for dt in times]

    labels = []
    peaks, _ = find_peaks(signal_data)
    for peak in peaks:
        label = {}
        peak_value = formatted_dates[peak]
        for possible_label in data_coordinator.all_labels:
            label[possible_label] = None
        label["Peak"] = True
        label["T1"] = formatted_dates[peak]
        label["T2"] = formatted_dates[peak]
        labels.append(label)
    return labels


autolabeling_dictionary = {
    "temperature_peaks": {
        "function": perform_temperature_peaks_autolabeling,
        "parameters": {},
        "display_name": "Temperature Peaks",
    },
    "threshold": {
        "function": perform_threshold_autolabeling,
        "parameters": {
            "autolabel_threshold": {
                "display_name": "Threshold",
                "default": 0,
                "min": None,
                "max": None,
            },
            "signals": {
                "display_name": "Signals",
                "default": ["Temperature"],
                "options": {
                    "Temperature": "Temperature",
                    "Pressure": "Pressure",
                    "Precipitation": "Precipitation",
                },
            },
        },
        "display_name": "Threshold",
    },
}