Utility Scripts for Ingestion¶
Utility Python scripts can modularize and scale your data provider logic. In the example below, we ingest the same DEMOS weather dataset but add custom graphs, filters, and data manipulations that dFL can call at runtime. This keeps the core provider focused on I/O and schema, while utilities handle smoothing, normalization, trimming, and visualization concerns.
When to Use Utility Scripts¶
- Reusable signal transforms (e.g., moving averages, robust scaling) are needed
- Clear separation between data access (provider) and analytics/UI helpers (utilities)
- Multiple custom graphers should be registered in dFL without bloating the provider
- Keep the data provider minimal, testable, and maintainable
Recommended Layout¶
weather_provider.py— lightweight provider (fetch, metadata, config)weather_utilities.py— transforms, plotting helpers, trimmingtemplate_layouts.py— shared Plotly/figure layout optionsauto_labeling.py— optional labeling rules
Capabilities Demonstrated Below¶
- Robust parsing of timestamps and numeric inputs (
parse_value) - Timezone-aware trim operations on timeseries (
find_nearest_datetime_index,trim_data) - Signal transforms such as simple moving average and robust scaling
- Hybrid plotting: create Seaborn/Matplotlib figures, then embed them in Plotly (base64) for a Plotly-based UI
- dFL integration: graphers consume the same
app_control_parametersandparametersthat dFL passes at runtime
Utility Functions Code¶
"""
Weather Data Provider Utilities
This module contains utility functions for processing weather data, including:
- Data parsing and validation
- Time series operations
- Data smoothing and normalization
- Visualization helpers
These utilities support the main weather data provider by handling common
data processing tasks and mathematical operations.
"""
from __future__ import annotations
import base64
import io
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, TypedDict, Union
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from numpy.typing import NDArray
matplotlib.use('Agg') # Use non-interactive backend to avoid GUI warnings
# ---- Typed return payloads ---------------------------------------------------
class SignalRecord(TypedDict):
data: NDArray[Any] # e.g., float, int; stays generic
data_name: str
times: NDArray[Any] # typically datetime64[ns, UTC]
errored: bool
class FetchDataResult(TypedDict):
id: str
signals: List[SignalRecord]
errored_signals: List[str]
def parse_value(value: Any) -> Union[datetime, float, int, str, None]:
"""
Parse a value to its most appropriate data type (datetime, float, int, or original).
This function attempts to convert input values in the following priority order:
1. Datetime (with automatic unit detection for timestamps)
2. Float
3. Integer
4. Original value (if all conversions fail)
Args:
value: Input value to parse (can be string, number, or datetime)
Returns:
Parsed value in most appropriate type, or None for empty strings
Timestamp Unit Detection:
- 10 digits: seconds since epoch
- 13 digits: milliseconds since epoch
- 16 digits: microseconds since epoch
- 19 digits: nanoseconds since epoch
- Default: milliseconds if uncertain
"""
# First, try to parse as datetime
try:
# Determine unit based on timestamp length
if isinstance(value, (int, float)):
timestamp_str = str(value)
length = len(timestamp_str)
if length == 10:
unit = "s" # Seconds
elif length == 13:
unit = "ms" # Milliseconds
elif length == 16:
unit = "us" # Microseconds
elif length == 19:
unit = "ns" # Nanoseconds
else:
# Default to milliseconds if unsure
unit = "ms"
# Use UTC timezone to avoid timezone conversion issues
dt = pd.to_datetime(value, unit=unit, utc=True)
else:
dt = pd.to_datetime(value, utc=True) # For strings or already datetime objects
if pd.isna(dt): # Check if the result is NaT
raise ValueError
return dt.to_pydatetime()
except (ValueError, TypeError):
pass
# If not int, try to convert to float
try:
return float(value)
except (ValueError, TypeError):
pass
# If not datetime, try to convert to int
try:
return int(value)
except (ValueError, TypeError):
pass
# If all conversions fail, return the original value
if isinstance(value, str) and len(value) == 0:
return None
return value
def find_nearest_datetime_index(
arr: NDArray[Any],
target_datetime: Optional[datetime]
) -> Optional[int]:
"""
Find the index of the nearest datetime in an array to a target datetime.
Handles both timezone-aware and naive datetime objects consistently by
normalizing timezone information between the array and target.
Args:
arr (array-like): Array of datetime values to search
target_datetime (datetime): Target datetime to find nearest match for
Returns:
int or None: Index of nearest datetime, or None if array is empty or error occurs
Timezone Handling:
- If target has timezone but array doesn't: removes target timezone
- If target lacks timezone but array has it: adds UTC timezone to target
- Ensures consistent comparison by aligning timezone information
"""
if target_datetime is None:
return None
# Convert array to numpy array if it's not already
arr = np.array(arr)
if len(arr) == 0:
return None
# Check if the array contains timezone-aware datetimes
try:
sample_time = pd.Timestamp(arr[0])
arr_has_timezone = sample_time.tzinfo is not None
except:
arr_has_timezone = False
# Check if target datetime has timezone
target_has_timezone = hasattr(target_datetime, "tzinfo") and target_datetime.tzinfo is not None
# Make timezone consistent between array and target
if target_has_timezone and not arr_has_timezone:
# Remove timezone from target to match array
target_datetime = target_datetime.replace(tzinfo=None)
elif not target_has_timezone and arr_has_timezone:
# Add UTC timezone to target to match array
target_datetime = target_datetime.replace(tzinfo=timezone.utc)
# Convert target_datetime to nanoseconds since epoch
try:
target_ns = int(target_datetime.timestamp() * 1e9)
except (AttributeError, TypeError, ValueError) as e:
print(f"Error converting target datetime to timestamp: {e}")
return None
try:
# Convert datetime64 array to nanoseconds since epoch
arr_ns = arr.astype("datetime64[ns]").astype(np.int64)
# Find the index of the nearest value
nearest_index = np.abs(arr_ns - target_ns).argmin()
return nearest_index
except Exception as e:
print(f"Error finding nearest datetime index: {e}")
return None
def simple_moving_average(
_record_name: str,
_signal_name: str,
raw_signal: NDArray[Any],
_times: NDArray[Any],
parameters: Dict[str, Any]
) -> NDArray[Any]:
"""
Apply simple moving average smoothing to a 1D array.
This function smooths data using a uniform window and pads edges to maintain
the original array length. Window size is validated and constrained to
reasonable bounds.
Args:
_record_name (str): The name of the record being analyzed (unused)
_signal_name (str): The name of the signal (unused)
raw_signal (array-like): Input data to smooth
_times (array-like): Time array of the record (unused)
parameters (dict): Dictionary containing smoothing parameters
Parameters Dictionary Keys:
- "simple_moving_average_moving_average_window_size" or "moving_average_window_size": int
Window size for moving average (default: 1, min: 1, max: data length)
Returns:
numpy.array: Smoothed data with same length as input
Edge Handling:
- Uses 'valid' convolution then pads edges with first/last smoothed values
- Left padding: first smoothed value repeated
- Right padding: last smoothed value repeated
"""
# Convert input to numpy array
data = np.array(raw_signal)
# Try both parameter naming conventions for compatibility
window_size = parameters.get("simple_moving_average_moving_average_window_size",
parameters.get("moving_average_window_size", 1))
window_size = int(window_size)
if window_size < 1:
print("Window size must be >= 1")
window_size = 1
if window_size > data.shape[0]:
print("Window size must be less than total length")
window_size = data.shape[0]
print(f"Using window size equal to full data length {window_size}")
# Create weights for simple moving average
weights = np.ones(window_size) / window_size
# Apply convolution
smoothed = np.convolve(data, weights, mode="valid")
# Pad the edges to maintain original length
padding = window_size - 1
left_pad = np.full(padding // 2, smoothed[0])
right_pad = np.full(padding - padding // 2, smoothed[-1])
return np.concatenate([left_pad, smoothed, right_pad])
def robust_scaling(
record_name: str,
signal_name: str,
raw_signal: NDArray[Any],
times: NDArray[Any],
parameters: Dict[str, Any]
) -> NDArray[Any]:
"""
Apply robust scaling to a 1D array with comprehensive edge case handling.
Robust scaling uses median and interquartile range (IQR) instead of mean
and standard deviation, making it less sensitive to outliers.
Formula: (x - median) / IQR
Args:
record_name (str): The name of the record being analyzed (unused)
signal_name (str): The name of the signal (unused)
raw_signal (array-like): Input data to scale
times (array-like): Time array of the record (unused)
parameters (dict): The dictionary of available parameters (unused)
Returns:
numpy.array: Robustly scaled data
Edge Case Handling:
- Empty input: returns empty array
- All identical values: returns zeros
- Zero IQR with non-constant data: falls back to standard deviation
- Single element: returns zero
- Extreme values: clipped to ±1e6 to prevent infinity
"""
data = np.asarray(raw_signal)
# Handle empty input
if data.size == 0:
return np.array([])
# Calculate robust statistics
median = np.median(data)
q1, q3 = np.percentile(data, [25, 75])
iqr = q3 - q1
# Handle edge cases
if iqr == 0:
# Case 1: All values identical
if np.all(data == data[0]):
return np.zeros_like(data)
# Case 2: Use std dev as fallback for non-constant data with zero IQR
iqr = np.std(data)
# Final fallback if std dev is also zero
if iqr == 0:
iqr = 1.0
# Handle single-element edge case
if data.size == 1:
return np.array([0.0]) # Single value becomes zero after scaling
# Apply scaling with numerical stability
scaled_data = (data - median) / iqr
# Clip extreme values to prevent +/- infinity (optional)
scaled_data = np.clip(scaled_data, -1e6, 1e6)
return scaled_data
def matplotlib_to_plotly_base64(fig_mpl: matplotlib.figure.Figure) -> str:
"""
Convert matplotlib figure to base64-encoded PNG for embedding in Plotly.
This utility enables complex seaborn plots that can't be easily replicated
in Plotly to be embedded as images within Plotly figures.
Args:
fig_mpl (matplotlib.figure.Figure): Matplotlib figure to convert
Returns:
str: Base64-encoded PNG data URL ready for use in Plotly layout images
Memory Management:
- Automatically closes matplotlib figure after conversion
- Uses BytesIO buffer for efficient memory usage
- High DPI (150) for crisp image quality
"""
buf = io.BytesIO()
fig_mpl.savefig(buf, format='png', bbox_inches='tight', dpi=150)
buf.seek(0)
img_base64 = base64.b64encode(buf.read()).decode('utf-8')
buf.close()
plt.close(fig_mpl) # Important: close the matplotlib figure to free memory
return f"data:image/png;base64,{img_base64}"
def trim_data(
record_data: FetchDataResult,
trim_1: Optional[str] = None,
trim_2: Optional[str] = None
) -> FetchDataResult:
"""
Trim record data based on time range specifications.
This function handles timezone-aware trimming by:
1. Detecting timezone format in existing data
2. Parsing trim values with timezone consistency
3. Finding nearest time indices for trimming
4. Applying trim to all signals in the record
Args:
record_data (FetchDataResult): Record data dictionary containing signals
trim_1 (str, optional): Start time for trimming (various formats accepted)
trim_2 (str, optional): End time for trimming (various formats accepted)
Returns:
FetchDataResult: Modified record_data with trimmed signals
Record Data Structure:
Expected: {"signals": [list of signal dictionaries]}
Signal: {"data": array, "times": array, ...}
Trimming Logic:
- trim_1 only: trim from start time to end of data
- trim_2 only: trim from beginning of data to end time
- Both: trim to specified time range
- Neither: return data unchanged
"""
signals = []
for signal in record_data["signals"]:
data = signal["data"]
times = signal["times"]
index1 = None
index2 = None
# Check if times array has timezone info
# Try to determine if the numpy array contains timezone-aware datetimes
has_timezone = False
if len(times) > 0:
try:
# Convert back to pandas datetime temporarily to check timezone
sample_time = pd.Timestamp(times[0])
has_timezone = sample_time.tzinfo is not None
except:
has_timezone = False
if trim_1 is not None and len(trim_1) != 0:
# Use parse_value to handle different timestamp formats
trim1 = parse_value(trim_1)
# Make timezone consistent with times array
if hasattr(trim1, "tzinfo") and trim1.tzinfo is not None:
if not has_timezone:
trim1 = trim1.replace(tzinfo=None)
else:
if has_timezone:
trim1 = trim1.replace(tzinfo=timezone.utc)
index1 = find_nearest_datetime_index(times, trim1)
if trim_2 is not None and len(trim_2) != 0:
# Use parse_value to handle different timestamp formats
trim2 = parse_value(trim_2)
# Make timezone consistent with times array
if hasattr(trim2, "tzinfo") and trim2.tzinfo is not None:
if not has_timezone:
trim2 = trim2.replace(tzinfo=None)
else:
if has_timezone:
trim2 = trim2.replace(tzinfo=timezone.utc)
index2 = find_nearest_datetime_index(times, trim2)
if index1 is not None and index2 is not None:
data = data[index1:index2]
times = times[index1:index2]
if index1 is not None and index2 is None:
data = data[index1:]
times = times[index1:]
if index1 is None and index2 is not None:
data = data[:index2]
times = times[:index2]
signal["data"] = data
signal["times"] = times
signals.append(signal)
record_data["signals"] = signals
return record_data
Wiring Utilities into a Provider¶
Your data provider can import and register these utilities so they are callable from dFL graphers, callbacks, or preprocessing hooks. The provider remains the single point of configuration while utilities encapsulate the heavy lifting.
Complete Provider Example¶
"""
Weather Data Provider Module
This module provides a data provider interface for weather data stored in parquet files.
It handles data fetching, filtering, trimming, and provides configuration for various
data processing and visualization options.
Key Features:
- Loads weather data from local parquet files
- Supports time-based data filtering and trimming with timezone awareness
- Provides custom graphing functions using Seaborn and Plotly
- Configures data processing options (smoothing, normalization)
- Includes robust error handling and logging
Data Flow:
1. Scan data folder for available parquet files (record IDs)
2. Extract available signals from the first data file
3. Provide fetch_data function for on-demand data loading
4. Provide custom graphing and processing functions
5. Return a complete provider configuration dictionary
"""
from __future__ import annotations
import logging
import traceback
from pathlib import Path
from typing import Any, Dict, List, Optional, TypedDict, Union
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import seaborn as sns
from numpy.typing import NDArray
from auto_labeling import autolabeling_dictionary
from template_layouts import layout_options
from weather_utilities import (
FetchDataResult,
SignalRecord,
matplotlib_to_plotly_base64,
robust_scaling,
simple_moving_average,
trim_data,
)
matplotlib.use('Agg') # Use non-interactive backend to avoid GUI warnings
# ---- Typed return payloads ---------------------------------------------------
class DataCoordinatorInfo(TypedDict, total=False):
# Core data access
fetch_data: Any # callable; leaving as Any to avoid recursive types
dataset_id: str
fetch_record_ids_for_dataset_id: Any
# Schema / metadata
all_possible_signals: List[str]
is_date: bool
data_folder: Path
# Processing / analysis options
custom_smoothing_options: Optional[Any]
custom_normalizing_options: Optional[Any]
trim_data: Optional[Any]
# Labeling / classification
auto_label_function_dictionary: Optional[Dict[str, Any]]
all_labels: List[str]
# Visualization / UI
custom_grapher_dictionary: Optional[Dict[str, Any]]
layout_options: Optional[Any]
default_trim_1: Optional[str]
default_trim_2: Optional[str]
def seaborn_plotly_example(
app_control_parameters: Dict[str, Any],
parameters: Dict[str, Any]
) -> go.Figure:
"""
Generate a Plotly line chart with Seaborn-like styling.
This function demonstrates creating a Plotly figure directly while
mimicking the aesthetic of a Seaborn plot. It fetches data, applies a
multiplier, and styles the chart.
Args:
app_control_parameters (dict): Global parameters from the application controller
parameters (dict): Parameters specific to this graphing function
Parameters Keys:
- "seaborn_plotly_example_multiplier": float (optional)
- "seaborn_plotly_example_signal_option": str (optional)
Returns:
plotly.graph_objects.Figure: A Plotly figure object
"""
print(f"Parameters: {parameters}")
multiplier = 1.0
if "seaborn_plotly_example_multiplier" in parameters:
multiplier = parameters["seaborn_plotly_example_multiplier"]
signal_option = "Temperature"
if "seaborn_plotly_example_signal_option" in parameters:
signal_option = parameters["seaborn_plotly_example_signal_option"]
record_data = app_control_parameters["data_coordinator"].fetch_data_async(
app_control_parameters["data_coordinator"].data_folder,
dataset_id=None,
record_id=app_control_parameters["record_id"],
signals=[signal_option],
global_data_params={},
trim_1=app_control_parameters["trim_t1"],
trim_2=app_control_parameters["trim_t2"],
timeout=10,
)
signal = record_data["signals"][0]["data"]
times = record_data["signals"][0]["times"]
# Ensure data is in a consistent numpy format
signal = np.array(signal).flatten()
times = np.array(times).flatten()
# Apply multiplier
signal = signal * multiplier
# Create Plotly figure directly
fig = go.Figure()
# Add the line trace
fig.add_trace(go.Scatter(
x=times,
y=signal,
mode='lines',
line=dict(width=2.5),
name=signal_option
))
# Update layout to match seaborn style
fig.update_layout(
title=dict(
text=f"{signal_option} Over Time (Multiplier: {multiplier})",
font=dict(size=16),
y=0.95
),
xaxis=dict(
title="Time",
showgrid=True,
gridcolor='rgba(0,0,0,0.1)',
zeroline=False
),
yaxis=dict(
title=signal_option,
showgrid=True,
gridcolor='rgba(0,0,0,0.1)',
zeroline=False
),
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)',
template="plotly_white" if not app_control_parameters["theme_value"] else "plotly_dark",
margin=dict(l=50, r=50, t=80, b=50)
)
return fig
def seaborn_graph_example(
app_control_parameters: Dict[str, Any],
parameters: Dict[str, Any]
) -> go.Figure:
"""
Generate a Seaborn plot and embed it as an image within a Plotly figure.
This function acts as a wrapper around get_figure_for_callback to generate
a plot using Matplotlib/Seaborn and then display it in the Plotly-based UI.
Args:
graph_type (str): The type of graph requested (unused)
app_control_parameters (dict): Global parameters from the application controller
parameters (dict): Parameters specific to this graphing function
Returns:
plotly.graph_objects.Figure: A Plotly figure containing the Seaborn plot image
"""
print(f"Parameters: {parameters}")
multiplier = 1.0
if "seaborn_graph_example_multiplier" in parameters:
multiplier = parameters["seaborn_graph_example_multiplier"]
signal_option = "Temperature"
if "seaborn_graph_example_signal_option" in parameters:
signal_option = parameters["seaborn_graph_example_signal_option"]
return get_figure_for_callback(
multiplier=multiplier,
signal_option=signal_option,
record_id=app_control_parameters["record_id"],
t1=app_control_parameters["t1"],
t2=app_control_parameters["t2"],
trim_t1=app_control_parameters["trim_t1"],
trim_t2=app_control_parameters["trim_t2"],
selection_toggle=app_control_parameters["selection_toggle_on"],
display_labels_toggle=app_control_parameters["display_labels"],
labels_table=app_control_parameters["label_data"],
proposed_labels=app_control_parameters["proposed_labels_data"],
labels_table_selected_rows=app_control_parameters["labels_table_selected_rows"],
figure_rendered=app_control_parameters["figure_rendered"],
label_value_combinations=app_control_parameters["label_value_combinations"],
data_coordinator=app_control_parameters["data_coordinator"],
theme_value=app_control_parameters["theme_value"],
)
def get_figure_for_callback(
multiplier: float,
signal_option: str,
record_id: str,
t1: Any,
t2: Any,
trim_t1: Optional[str],
trim_t2: Optional[str],
selection_toggle: bool,
display_labels_toggle: bool,
labels_table: Any,
proposed_labels: Any,
labels_table_selected_rows: List[int],
figure_rendered: bool,
label_value_combinations: Any,
data_coordinator: Any,
theme_value: bool,
) -> go.Figure:
"""
Core logic for creating a Seaborn plot and embedding it in Plotly.
This function fetches data, processes it, generates a line plot with
Seaborn, converts the plot to a base64 image, and embeds it into a
Plotly figure for display.
Args:
(various): This function takes a large number of parameters from the
application controller to define the plot state.
Returns:
plotly.graph_objects.Figure: Plotly figure with embedded image.
"""
record_data = data_coordinator.fetch_data_async(
data_coordinator.data_folder,
dataset_id=None,
record_id=record_id,
signals=[signal_option],
global_data_params={},
trim_1=trim_t1,
trim_2=trim_t2,
timeout=10,
)
signal = record_data["signals"][0]["data"]
times = record_data["signals"][0]["times"]
# Force conversion to plain numpy arrays and flatten if needed
signal = np.array(signal).flatten()
times = np.array(times).flatten()
# Apply multiplier
signal = signal * multiplier
# Create matplotlib figure with seaborn styling
plt.style.use('seaborn-v0_8' if hasattr(plt.style, 'seaborn-v0_8') else 'default')
fig_mpl, ax = plt.subplots(figsize=(10, 6))
# Ensure completely clean data types for Seaborn
times_clean = [pd.Timestamp(t).to_pydatetime() for t in times]
signal_clean = [float(s) for s in signal]
# Create DataFrame for seaborn
df = pd.DataFrame({'Time': times_clean, 'Signal': signal_clean})
# Use seaborn for plotting
sns.lineplot(data=df, x='Time', y='Signal', ax=ax, linewidth=2.5)
ax.set_title(f"{signal_option} Over Time (Multiplier: {multiplier})", fontsize=16, pad=20)
ax.set_xlabel("Time", fontsize=12)
ax.set_ylabel(signal_option, fontsize=12)
ax.grid(True, alpha=0.3)
# Convert to base64 and embed in Plotly
img_base64 = matplotlib_to_plotly_base64(fig_mpl)
# Create Plotly figure with embedded seaborn plot
fig = go.Figure()
fig.add_layout_image(
dict(
source=img_base64,
xref="paper", yref="paper",
x=0, y=1, sizex=1, sizey=1,
sizing="contain", layer="below"
)
)
# Update layout for a clean appearance
fig.update_layout(
xaxis=dict(showgrid=False, showticklabels=False, zeroline=False),
yaxis=dict(showgrid=False, showticklabels=False, zeroline=False),
margin=dict(l=0, r=0, t=0, b=0),
template="plotly_white" if not theme_value else "plotly_dark"
)
return fig
def simple_graph(
app_control_parameters: Dict[str, Any],
parameters: Dict[str, Any]
) -> str:
"""
A simple example of a custom graph function that returns text.
This demonstrates the basic structure of a grapher function, which can
return strings, dictionaries, or Plotly figures.
Args:
graph_type (str): The type of graph requested (unused)
app_control_parameters (dict): Global parameters from the app (unused)
parameters (dict): Parameters specific to this function
Returns:
str: A string with the text parameter value.
"""
print(f"Parameters: {parameters}")
target_key = "simple_graph_text_parameter_1"
text_to_return = "Simple Graph"
if target_key in parameters:
text_to_return = f"Simple Graph: {parameters[target_key]}"
return text_to_return
# pylint: disable=too-many-statements
def get_provider(_: Any) -> DataCoordinatorInfo:
"""
Get the complete data provider configuration for the Weather App.
This function sets up and returns a comprehensive configuration dictionary
that defines the entire data provider interface, including data access,
processing options, and UI elements.
Args:
_ (Any): Unused parameter for compatibility with the interface.
Returns:
...
How dFL Uses These Utilities¶
- Transforms as knobs: expose parameters (e.g.,
moving_average_window_size) in your GUI to let users tune smoothing or scaling at runtime - Graphers as views: register functions like
seaborn_plotly_exampleandseaborn_graph_exampleso analysts can switch between native Plotly lines and Seaborn-styled figures embedded in Plotly - Trimming everywhere: call
trim_data()in either the provider (pre-fetch) or as a post-fetch pipeline step to ensure consistent time windows across signals
Production Tips¶
- Prefer pure functions in utilities so they are easy to test
- Centralize timezone logic to prevent subtle bugs
- Keep providers lean (I/O + schema); put heavy transforms/plots in utilities