Integrating Custom Graphs from Scratch¶
Custom graphs may be easily integrated into dFL by adding them through the custom graph API. While many simple graphs from matplotlib, etc., can be integrated into dFL with just a few lines of code, here we provide an example of how to write a sophisticated 2 dimensional modespyec graph interface (converted from IDL to python). Here Modespyec is fully incorporated into the dFL GUI via the data_provider ingestion and utilities scripts.
Ingestion scripts in dFL are called Data Providers. To include a custom graph in dFL, one must first define a custom graph python dictionary, which can be located under the 'Configuration Dictionaries' section of the Data Provider script. This dictionary determines the parameters needed for the custom graph, any delimiters required, as well as the display name of those parameters in the GUI. An example for Modespyec is given below.
Modespyec Graphs in dFL¶
The following code snipped must be added to the data provider script.
=== "Python"
# --- Configuration Dictionaries ---
custom_grapher_dictionary = {
"modespyec": {
"display_name": "modespyec spectrogram",
"parameters": {
"tmin_modespyec": {"default": 0.0, "min": 0.0, "max": None, "display_name": "tmin"},
"tmax_modespyec": {"default": 8.0, "min": 0.0, "max": None, "display_name": "tmax"},
"blocksize_modespyec": {"default": 800, "min": 0, "max": None, "display_name": "blocksize"},
"blockstride_modespyec": {"default": 400, "min": 0, "max": None, "display_name": "blockstride"},
"nfft_modespyec": {"default": 1024, "min": 0, "max": None, "display_name": "nfft"},
"nfsmooth_modespyec": {"default": 7, "min": 0, "max": None, "display_name": "nfsmooth"},
"coh_min_modespyec": {"default": 0.90, "min": 0.0, "max": 1.0, "display_name": "coh_min"},
"eps_int_modespyec": {"default": 0.20, "min": 0.0, "max": 1.0, "display_name": "eps_int"},
"pow_frac_modespyec": {"default": 0.10, "min": 0.0, "max": 1.0, "display_name": "pow_frac"},
"modespyec_options": {"default": "spectrogram", "options": {"spectrogram": "spectrogram", "average_psd": "average_psd", "rms_amplitude": "rms_amplitude", "cross_phase_delta": "cross_phase_delta", "coherence": "coherence", "coherence_compare": "coherence_compare"}, "display_name": "modespyec Options"}
},
"function": get_modespyec,
}
}
The configuration dictionary includes a python function called 'get_modespyec', also included in the data provider script.
=== "Python"
from modespyec.modespyec_gui import get_figure_for_callback
def get_modespyec(app_control_parameters, parameters):
"""
Generate a `modespyec` figure for plasma mode analysis.
This function serves as a bridge between the main application and the
`modespyec` library. It passes all necessary parameters from the UI
and data coordinator to the `modespyec` figure generation function.
Args:
app_control_parameters (dict): Global parameters from the application controller
parameters (dict): Parameters specific to the `modespyec` graphing function
Returns:
A Plotly figure object generated by the `modespyec` library.
"""
return get_figure_for_callback(
modespyec_option=parameters["modespyec_modespyec_options"],
tmin_value=parameters["modespyec_tmin_modespyec"],
tmax_value=parameters["modespyec_tmax_modespyec"],
blocksize=parameters["modespyec_blocksize_modespyec"],
blockstride=parameters["modespyec_blockstride_modespyec"],
nfft=parameters["modespyec_nfft_modespyec"],
nfsmooth=parameters["modespyec_nfsmooth_modespyec"],
coh_min=parameters["modespyec_coh_min_modespyec"],
eps_int=parameters["modespyec_eps_int_modespyec"],
pow_frac=parameters["modespyec_pow_frac_modespyec"],
shot_id=app_control_parameters["shot_id"],
t1=app_control_parameters["t1"],
t2=app_control_parameters["t2"],
gz_t1=app_control_parameters["trim_t1"],
gz_t2=app_control_parameters["trim_t2"],
selection_toggle=app_control_parameters["selection_toggle_on"],
display_labels_toggle=app_control_parameters["display_labels"],
labels_table=app_control_parameters["label_data"],
proposed_labels=app_control_parameters["proposed_labels_data"],
labels_table_selected_rows=app_control_parameters["labels_table_selected_rows"],
figure_rendered=app_control_parameters["figure_rendered"],
label_value_combinations=app_control_parameters["label_value_combinations"],
data_coordinator=app_control_parameters["data_coordinator"],
theme_value=app_control_parameters["theme_value"],
)
In this case 'modespyec_gui' is a python script that requires higher dimensional magnetic probe data in order to extract its features, located in the 'data_folder' path. Here, the dependencies are more involved and several python libraries are loaded.
=== "Python"
from queue import Empty
import matplotlib
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from dash import Patch
from dash import callback_context as ctx
from dash import no_update
from dataclasses import dataclass
from typing import Optional, List, Dict
try:
from . import modespyec
except ImportError:
import modespyec
modespyec_options = [
"spectrogram",
"average_psd",
"rms_amplitude",
"cross_phase_delta",
"coherence",
"coherence_compare",
]
def matplotlib_colors_to_plotly_colorscale(color_list):
# Convert color list to a Plotly color scale with positions
n_colors = len(color_list)
return [(float(i) / (n_colors - 1), color) for i, color in enumerate(color_list)]
def make_modespyec_graph(
graph_type,
shot,
block_size,
block_stride,
nfft,
window,
nf_smooth,
t_min,
t_max,
coh_min,
eps_int,
data_coordinator,
pow_frac=None,
theme_value=False,
):
shot = int(shot)
probe_name_1, probe_name_2, delta_theta = get_default_probe(shot)
trim_t1 = None
trim_t2 = None
shot_data = data_coordinator.fetch_data_async(
data_coordinator.data_folder,
dataset_id=None,
shot_id=shot,
signals=[probe_name_1],
global_data_params={},
trim_1=trim_t1,
trim_2=trim_t2,
timeout=10,
)
shot_data_probe_1 = {}
try:
results = shot_data["signals"]
if len(results) == 0:
fig = go.Figure()
template = get_modespyec_theme_template(theme_type=theme_value)
fig.update_layout(title=f"No data for shot #{shot}", template=template)
return fig
shot_data_probe_1 = results[0]
except Empty:
fig = go.Figure()
template = get_modespyec_theme_template(theme_type=theme_value)
fig.update_layout(title="Error loading data: timeout", template=template)
return fig
probe1_data = shot_data_probe_1["data"]
probe1_times = shot_data_probe_1["times"]
trim_t1 = None
trim_t2 = None
shot_data_2 = data_coordinator.fetch_data_async(
data_coordinator.data_folder,
dataset_id=None,
shot_id=shot,
signals=[probe_name_2],
global_data_params={},
trim_1=trim_t1,
trim_2=trim_t2,
timeout=10,
)
shot_data_probe_2 = {}
try:
results = shot_data_2["signals"]
if len(results) == 0:
fig = go.Figure()
fig.update_layout(title=f" no data for shot #{shot}")
return fig
shot_data_probe_2 = results[0]
except Empty:
print("error!")
probe2_data = shot_data_probe_2["data"]
probe2_times = shot_data_probe_2["times"]
times = probe1_times
if len(probe2_times) > len(probe1_times):
times = probe2_times
time_sec = times * 1.0e-3
ts = np.mean(np.diff(time_sec))
# PT data signal to get probe data
print(f"shot #{shot} --> {probe_name_1.upper()}, {probe_name_2.upper()}")
print(f"Fs = {1.0e-3 / ts:.1f} kHz")
range_filter = np.logical_and(time_sec >= t_min, time_sec <= t_max)
spec = modespyec.wsfft_paired_signal(
time_sec[range_filter],
probe1_data[range_filter],
probe2_data[range_filter],
block_size,
block_stride,
nfft,
window,
nf_smooth,
)
bbox = get_bbox(spec)
figure = None
if graph_type == "spectrogram":
modespyec_clist = [matplotlib.colors.to_rgb(modespyec.get_color(n)) for n in np.arange(-5, 6, 1)]
figure = modespyec_spectorgram_plot(
modespyec_clist,
modespyec.get_mode_map(
spec, delta_theta, get_coh_min(spec=spec, coh_min=coh_min), no_value=0.0, p_frac=pow_frac
),
bbox=bbox,
shot_id=shot,
theme_value=theme_value,
)
elif graph_type == "average_psd":
figure = modespyec_plotly(
np.log10((spec["X11"] + spec["X22"]) / 2.0),
f"Average PSD ({probe_name_1}, {probe_name_2})",
bbox=bbox,
theme_value=theme_value,
)
elif graph_type == "cross_phase_delta":
figure = modespyec_plotly(
np.round(-1.0 * (180.0 / np.pi) * np.angle(spec["SX12"]) / delta_theta),
f"cross-phase/delta ({probe_name_1}, {probe_name_2})",
bbox=bbox,
theme_value=theme_value,
)
elif graph_type == "coherence":
figure = modespyec_plotly(
spec["SC12"], f"coherence ({probe_name_1}, {probe_name_2})", bbox=bbox, theme_value=theme_value
)
elif graph_type == "coherence_compare":
masked = np.copy(spec["SC12"])
masked[spec["SC12"] < get_coh_min(spec=spec, coh_min=coh_min)] = 0.0
figure = modespyec_plotly(
masked,
f"coherence ({probe_name_1}, {probe_name_2}) > {get_coh_min(spec=spec, coh_min=coh_min)}",
bbox=bbox,
theme_value=theme_value,
)
elif graph_type == "rms_amplitude":
figure = rms_amplitude_figure(
shot=shot, spec=spec, coh_min=coh_min, delta_theta=delta_theta, eps_int=eps_int, theme_value=theme_value
)
return figure
def get_coh_min(spec, coh_min):
return spec["c95"] if coh_min < 0.0 else coh_min
def get_bbox(spec):
bbox = [
spec["tmid"][0],
spec["tmid"][-1],
spec["freq"][0] / 1e3,
spec["freq"][-1] / 1e3,
]
return bbox
def get_default_probe(shot_no: int):
if shot_no >= 152000:
return "mpi66m307d", "mpi66m340d", 33.0
elif shot_no >= 144783:
return "mpi66m307e", "mpi66m340e", 33.0
elif shot_no >= 144760:
return "mpi66m307d", "mpi66m340d", 33.0
raise NotImplementedError
def modespyec_plotly(thing: np.array, title: str, bbox: list, theme_value: bool):
# Get theme template
template = get_modespyec_theme_template(theme_type=theme_value) # Default to dark theme
# Create the figure with the template
fig = px.imshow(
thing,
origin="lower",
x=np.linspace(bbox[0], bbox[1], thing.shape[1]),
y=np.linspace(bbox[2], bbox[3], thing.shape[0]),
labels={"x": "time [sec]", "y": "freq [kHz]"},
aspect="auto",
)
fig.update_layout(
title=title, xaxis_title="time [sec]", yaxis_title="freq [kHz]", template=template # Apply our theme
)
return fig
def rms_amplitude_figure(shot, spec, coh_min, delta_theta, eps_int, theme_value: bool):
# Get theme template
template = get_modespyec_theme_template(theme_type=theme_value) # Default to dark theme
amps = modespyec.get_amplitude(
spec,
[5, 4, 3, 2, 1, 0, -1, -2, -3, -4, -5],
delta_theta,
coh_min=get_coh_min(spec=spec, coh_min=coh_min),
eps_int=eps_int,
)
fig = go.Figure()
for n in amps.keys():
fig.add_trace(
go.Scatter(x=spec["tmid"], y=amps[n], mode="lines", name=f"n={n}", line={"color": modespyec.get_color(n)})
)
fig.update_layout(
title=f"shot #{shot}",
xaxis_title="time [sec]",
yaxis_title="RMS amplitude [T/s]",
template=template,
)
return fig
def modespyec_spectorgram_plot(modespyec_clist, data_array, bbox, shot_id, theme_value: bool):
template = get_modespyec_theme_template(theme_type=theme_value)
modespyec_clist_hex = [f"#{int(r * 255):02x}{int(g * 255):02x}{int(b * 255):02x}" for r, g, b in modespyec_clist]
plotly_colorscale = matplotlib_colors_to_plotly_colorscale(modespyec_clist_hex)
heatmap = go.Heatmap(
z=data_array,
x=np.linspace(bbox[0], bbox[1], data_array.shape[1]),
y=np.linspace(bbox[2], bbox[3], data_array.shape[0]),
colorscale=plotly_colorscale,
zmin=-5,
zmax=5,
colorbar={"tickvals": np.arange(-5, 6, 1), "orientation": "h", "y": 1, "xpad": 0},
xgap=0,
ygap=0,
)
fig = go.Figure(heatmap)
fig.update_layout(
title=f"{shot_id}: modespyec.get_mode_map",
xaxis_title="time [sec]",
yaxis_title="freq [kHz]",
template=template,
)
return fig
def update_figure_for_trigger(
selected_labels,
t1,
t2,
selection_toggle,
display_labels_toggle,
label_data,
proposed_labels,
context,
gz1,
gz2,
label_value_combinations,
data_coordinator,
theme_value,
):
trigger_id = context.triggered[0]["prop_id"].split(".")[0]
def update_existing_figure_with_shapes(with_drag=False):
config = LabelShapeConfig(
selected_labels=selected_labels,
t1=t1,
t2=t2,
trim_t1=None,
trim_t2=None,
selection_toggle_on=selection_toggle,
display_labels=display_labels_toggle,
label_data=label_data,
proposed_labels_data=proposed_labels,
label_value_combinations=label_value_combinations,
)
shapes = get_shapes(config)
patched_figure = Patch()
patched_figure["layout"]["shapes"] = shapes
if "toggle-user-selection" in trigger_id:
if with_drag:
patched_figure["layout"]["dragmode"] = "select"
patched_figure["layout"]["clickmode"] = "event+select"
patched_figure["layout"]["selectdirection"] = "h"
else:
patched_figure["layout"]["dragmode"] = "zoom"
return patched_figure
if trigger_id == "update-global-zoom":
patch = Patch()
gz_t1 = data_coordinator.parse_value(gz1)
gz_t2 = data_coordinator.parse_value(gz2)
xrange = [gz_t1, gz_t2]
patch["layout"]["xaxis"] = {"range": xrange}
return patch
if trigger_id == "switch":
patch = Patch()
# Get theme template
template = get_modespyec_theme_template(theme_type=theme_value)
patch["layout"]["template"] = template
return patch
label_range_updated = "label-t1" in trigger_id or "label-t2" in trigger_id
if (
"toggle-user-selection" in trigger_id
or "update-label-selection" in trigger_id
or label_range_updated
or "labels_table" in trigger_id
or "toggle-display-labels" in trigger_id
):
return update_existing_figure_with_shapes(with_drag=selection_toggle)
selected_labels_trigger = next(
(d for d in context.triggered if "labels_table.selected_rows" in d["prop_id"]), {}
).get("value")
if selected_labels_trigger is not None:
return update_existing_figure_with_shapes()
relayout_data_trigger = next((d for d in context.triggered if "relayoutData" in d["prop_id"]), {}).get("value")
if relayout_data_trigger is not None:
if "selections" in relayout_data_trigger:
selections = relayout_data_trigger["selections"]
if len(selections) == 0:
return no_update
else:
return update_existing_figure_with_shapes()
if (
relayout_data_trigger is not None
and "xaxis.autorange" in relayout_data_trigger
and "yaxis.autorange" in relayout_data_trigger
):
if relayout_data_trigger["xaxis.autorange"] is True and relayout_data_trigger["yaxis.autorange"] is True:
patch = Patch()
xrange = [gz1, gz2]
patch["layout"]["xaxis"]["range"] = xrange
patch["layout"]["dragmode"] = "select"
return patch
return no_update
def should_update_shapes(context, figure_rendered):
if figure_rendered is not True:
return False
prop_id = context.triggered[0]["prop_id"]
update_triggers = {
"relayoutData",
"label-t1",
"label-t2",
"toggle-user-selection",
"update-global-zoom",
"labels_table",
"switch",
}
return any(trigger in prop_id for trigger in update_triggers)
def get_figure_for_callback(
modespyec_option,
tmin_value,
tmax_value,
blocksize,
blockstride,
nfft,
nfsmooth,
coh_min,
eps_int,
shot_id,
pow_frac,
t1,
t2,
gz_t1,
gz_t2,
selection_toggle,
display_labels_toggle,
labels_table,
proposed_labels,
labels_table_selected_rows,
figure_rendered,
label_value_combinations,
data_coordinator,
theme_value,
):
proposed_calibrated_labels = []
calibrated_labels = []
for label in labels_table:
label["T1"] = label["T1"] / 1000
label["T2"] = label["T2"] / 1000
calibrated_labels.append(label)
if proposed_labels is not None:
for proposed_label in proposed_labels:
proposed_label["T1"] = proposed_label["T1"] / 1000
proposed_label["T2"] = proposed_label["T2"] / 1000
proposed_calibrated_labels.append(proposed_label)
calibrated_t1 = None
calibrated_t2 = None
calibrated_gz_t1 = None
calibrated_gz_t2 = None
selection_on = selection_toggle
display_labels_on = display_labels_toggle
t1 = data_coordinator.parse_value(t1)
t2 = data_coordinator.parse_value(t2)
gz_t1 = data_coordinator.parse_value(gz_t1)
gz_t2 = data_coordinator.parse_value(gz_t2)
if t1 is not None:
calibrated_t1 = t1 / 1000
if t2 is not None:
calibrated_t2 = t2 / 1000
if gz_t1 is not None:
calibrated_gz_t1 = gz_t1 / 1000
if gz_t2 is not None:
calibrated_gz_t2 = gz_t2 / 1000
selected_labels = None
if (
labels_table is not None
and len(calibrated_labels) > 0
and labels_table_selected_rows is not None
and len(labels_table_selected_rows) > 0
):
if len(labels_table_selected_rows) > 0:
selected_row_number = labels_table_selected_rows[0]
if len(calibrated_labels) > selected_row_number:
selected_labels = [calibrated_labels[selected_row_number]]
if should_update_shapes(context=ctx, figure_rendered=figure_rendered):
result_for_update = update_figure_for_trigger(
selected_labels=selected_labels,
t1=calibrated_t1,
t2=calibrated_t2,
selection_toggle=selection_on,
display_labels_toggle=display_labels_on,
label_data=calibrated_labels,
proposed_labels=proposed_calibrated_labels,
context=ctx,
gz1=calibrated_gz_t1,
gz2=calibrated_gz_t2,
label_value_combinations=label_value_combinations,
data_coordinator=data_coordinator,
theme_value=theme_value,
)
if result_for_update is not None:
updated_figure = result_for_update
return updated_figure, no_update
window = "Hamming"
figure = make_modespyec_graph(
graph_type=modespyec_option,
shot=shot_id,
block_size=blocksize,
block_stride=blockstride,
nfft=nfft,
window=window,
nf_smooth=nfsmooth,
t_min=tmin_value,
t_max=tmax_value,
coh_min=coh_min,
eps_int=eps_int,
data_coordinator=data_coordinator,
pow_frac=pow_frac,
theme_value=theme_value,
)
config = LabelShapeConfig(
selected_labels=selected_labels,
t1=calibrated_t1,
t2=calibrated_t2,
trim_t1=None,
trim_t2=None,
selection_toggle_on=selection_on,
display_labels=display_labels_on,
label_data=calibrated_labels,
proposed_labels_data=proposed_labels,
label_value_combinations=label_value_combinations,
)
shapes = get_shapes(config)
figure.update_layout(
xaxis={"range": [calibrated_gz_t1, calibrated_gz_t2]},
dragmode="select",
margin_l=8,
margin_r=8,
shapes=shapes,
)
return figure
def get_modespyec_theme_template(theme_type="dark"):
"""Get the appropriate theme template based on theme type."""
if theme_type == "sophelio_dark" or theme_type == "refined_gray":
return create_dark_template()
else:
return create_light_template()
def create_light_template():
"""Create a light theme template for Plotly figures."""
return {
"layout": go.Layout(
font={"color": "rgba(33, 37, 41, 0.85)", "family": "Cascadia Mono, Cascadia Mono PL, monospace"},
paper_bgcolor="#f1f3f5", # --bg-primary
plot_bgcolor="#f1f3f5", # --bg-primary
# Title styling
title={
"font": {"color": "rgba(33, 37, 41, 0.85)", "family": "Cascadia Mono, Cascadia Mono PL, monospace"},
"x": 0.5, # Center horizontally
"xanchor": "center", # Anchor to center
"y": 0.95, # Position higher to avoid overlap
"yanchor": "top", # Anchor to top
"pad": {"b": 20} # Add bottom padding
},
# Axes styling
xaxis={
"title": {"font": {"color": "rgba(33, 37, 41, 0.85)"}},
"color": "rgba(33, 37, 41, 0.85)",
"linecolor": "rgba(0, 0, 0, 0.1)",
"gridcolor": "rgba(0, 0, 0, 0.1)",
"zerolinecolor": "rgba(0, 0, 0, 0.1)",
"tickcolor": "rgba(33, 37, 41, 0.65)",
"tickfont": {"color": "rgba(33, 37, 41, 0.65)"},
"showgrid": False,
"automargin": True,
},
yaxis={
"title": {"font": {"color": "rgba(33, 37, 41, 0.85)"}},
"color": "rgba(33, 37, 41, 0.85)",
"linecolor": "rgba(0, 0, 0, 0.1)",
"gridcolor": "rgba(0, 0, 0, 0.1)",
"zerolinecolor": "rgba(0, 0, 0, 0.1)",
"tickcolor": "rgba(33, 37, 41, 0.65)",
"tickfont": {"color": "rgba(33, 37, 41, 0.65)"},
"showgrid": True,
"gridwidth": 1,
"griddash": "dot",
"automargin": True,
},
# Legend styling
legend={
"font": {"color": "rgba(33, 37, 41, 0.85)"},
"bgcolor": "#e9ecef",
"bordercolor": "rgba(0, 0, 0, 0.1)",
},
# Modebar styling
modebar={"bgcolor": "#e9ecef", "color": "rgba(33, 37, 41, 0.85)", "activecolor": "#0056b3"},
# Hover label styling
hoverlabel={
"bgcolor": "#e9ecef",
"bordercolor": "rgba(0, 0, 0, 0.1)",
"font": {"color": "rgba(33, 37, 41, 0.85)"},
},
)
}
def create_dark_template():
"""Create a dark theme template for Plotly figures."""
return {
"layout": go.Layout(
font={"color": "rgba(255, 255, 255, 0.85)", "family": "Cascadia Mono, Cascadia Mono PL, monospace"},
paper_bgcolor="#222327", # Main background color
plot_bgcolor="#222327", # Plot area background
# Title styling
title={
"font": {"color": "rgba(255, 255, 255, 0.85)", "family": "Cascadia Mono, Cascadia Mono PL, monospace"},
"x": 0.5, # Center horizontally
"xanchor": "center", # Anchor to center
"y": 0.95, # Position higher to avoid overlap
"yanchor": "top", # Anchor to top
"pad": {"b": 20} # Add bottom padding
},
# Axes styling
xaxis={
"title": {"font": {"color": "rgba(255, 255, 255, 0.85)"}},
"color": "rgba(255, 255, 255, 0.85)",
"linecolor": "rgba(255, 255, 255, 0.1)",
"gridcolor": "rgba(255, 255, 255, 0.1)",
"zerolinecolor": "rgba(255, 255, 255, 0.1)",
"tickcolor": "rgba(255, 255, 255, 0.65)",
"tickfont": {"color": "rgba(255, 255, 255, 0.65)"},
"showgrid": True,
"gridwidth": 1,
"griddash": "dot",
"automargin": True,
},
yaxis={
"title": {"font": {"color": "rgba(255, 255, 255, 0.85)"}},
"color": "rgba(255, 255, 255, 0.85)",
"linecolor": "rgba(255, 255, 255, 0.1)",
"gridcolor": "rgba(255, 255, 255, 0.1)",
"zerolinecolor": "rgba(255, 255, 255, 0.1)",
"tickcolor": "rgba(255, 255, 255, 0.65)",
"tickfont": {"color": "rgba(255, 255, 255, 0.65)"},
"showgrid": True,
"gridwidth": 1,
"griddash": "dot",
"automargin": True,
},
# Legend styling
legend={
"font": {"color": "rgba(255, 255, 255, 0.85)"},
"bgcolor": "#16171b",
"bordercolor": "rgba(255, 255, 255, 0.1)",
},
# Modebar styling
modebar={"bgcolor": "#16171b", "color": "rgba(255, 255, 255, 0.85)", "activecolor": "#007bff"},
# Hover label styling
hoverlabel={
"bgcolor": "#16171b",
"bordercolor": "rgba(255, 255, 255, 0.1)",
"font": {"color": "rgba(255, 255, 255, 0.85)"},
},
)
}
@dataclass
class LabelShapeConfig:
selected_labels: List
t1: float
t2: float
trim_t1: Optional[float]
trim_t2: Optional[float]
selection_toggle_on: bool
display_labels: bool
label_data: List
proposed_labels_data: Dict
label_value_combinations: Dict
def get_shapes(config: LabelShapeConfig):
trim_1_value = None
trim_2_value = None
if config.trim_t1 is not None:
trim_1_value = config.data_coordinator.parse_value(config.trim_t1)
if config.trim_t2 is not None:
trim_2_value = config.data_coordinator.parse_value(config.trim_t2)
shapes = []
if config.label_data is not None and config.display_labels:
for label in config.label_data:
x0 = label["T1"]
x1 = label["T2"]
x0, x1 = adjust_values(x0=x0, x1=x1)
if label_out_of_bounds(x0, x1, trim_1_value, trim_2_value):
continue
border_style = get_border_style_for(label, config.selected_labels)
shape = {
"type": "rect",
"xref": "x",
"yref": "paper",
"x0": x0,
"y0": 0,
"x1": x1,
"y1": 1,
"fillcolor": get_color_for_label(label, config.label_value_combinations),
"opacity": 0.5,
"line": border_style,
}
if border_style["color"] == "yellow":
shape["editable"] = True
shapes.append(shape)
if config.proposed_labels_data is not None and config.display_labels:
for label in config.proposed_labels_data:
x0 = label["T1"]
x1 = label["T2"]
x0, x1 = adjust_values(x0=x0, x1=x1, is_date=config.data_coordinator.is_date)
if label_out_of_bounds(x0, x1, trim_1_value, trim_2_value):
continue
border_style = get_border_style_for(label, config.selected_labels, is_proposed=True)
shape = {
"type": "rect",
"xref": "x",
"yref": "paper",
"x0": x0,
"y0": 0,
"x1": x1,
"y1": 1,
"fillcolor": get_color_for_label(label, config.label_value_combinations),
"opacity": 0.5,
"line": border_style,
}
if border_style["color"] == "white":
shape["editable"] = True
shapes.append(shape)
if config.selection_toggle_on and config.t1 is not None and config.t2 is not None:
selection_shape = {
"type": "rect",
"xref": "x",
"yref": "paper",
"x0": config.t1,
"y0": 0,
"x1": config.t2,
"y1": 1,
"fillcolor": "LightSkyBlue",
"opacity": 0.5,
"line": {
"color": "black",
"width": 2,
},
}
shapes.append(selection_shape)
return shapes
def adjust_values(x0, x1):
if isinstance(x0, (int, float)) and isinstance(x1, (int, float)) and x0 == x1:
x0 -= 1
x1 += 1
return x0, x1
def label_out_of_bounds(start_x, end_x, trim_1_value, trim_2_value):
"""
Check if a label is outside the trimmed data bounds.
"""
# For non-datetime values, use direct comparison
return (trim_1_value is not None and start_x < trim_1_value) or (
trim_2_value is not None and end_x > trim_2_value
)
def get_border_style_for(input_label, selected_labels, is_proposed=False):
line_width = 4 if is_proposed else 2 # All lines will be 2 points wide
base_line_color = "var(--accent-color)" if is_proposed else "var(--text-secondary)" # Accent color if proposed
selected_line_color = "white" if is_proposed else "yellow"
line_style = "dash" if is_proposed else "solid"
if selected_labels is None:
return {"color": base_line_color, "width": line_width, "dash": line_style}
for selection in selected_labels:
if input_label == selection:
return {"color": selected_line_color, "width": line_width, "dash": line_style}
return {"color": base_line_color, "width": line_width, "dash": line_style}
def get_color_for_label(input_label, label_value_combinations):
keys_list = list(input_label.keys())
keys_to_ignore = ["T1", "T2", "Shot", "Dataset"]
classes_keys_list = [key for key in keys_list if key not in keys_to_ignore]
classes_for_label = [key for key in classes_keys_list if input_label[key] is True]
for label_value_combination in label_value_combinations:
if "label_classes" in label_value_combination and classes_for_label == label_value_combination["label_classes"]:
return label_value_combination["color"]
return "#008000"
Finally, a windowed sliding FFT algorithm is required to complete the graph definitions.
=== "Python"
"""
Windowed sliding FFT of two signals x1, x2.
The analysis window has length = blocksize.
The FFT has nfft >= blocksize samples.
Each analysis is separated by blockstride samples.
The time vector t is assumed to have a constant delta.
"""
import numpy as np
# pylint: disable=too-many-locals
def wsfft_paired_signal(
t: np.array,
x1: np.array,
x2: np.array,
blocksize: int,
blockstride: int,
nfft: int,
win_type: str,
nfsmooth: int,
) -> dict:
assert len(t.shape) == 1
assert len(x1.shape) == 1
assert len(x2.shape) == 1
ts = np.mean(np.diff(t))
fs = 1.0 / ts
assert ts > 0
assert nfft >= blocksize
n = x1.shape[0]
assert x2.shape[0] == n and t.shape[0] == n
block_start = np.arange(0, n - blocksize, blockstride)
nb = len(block_start)
assert nfft % 2 == 0, "please use even number for nfft"
nffth = nfft // 2
freq = (np.arange(nffth) / nffth) * (fs / 2.0)
w = get_window_weights(win_type, blocksize)
pw = np.sum(w * w) / blocksize
# Factor ff is used like this: RMS(.) = sqrt( ff * sum_over_freq (windowed_fft_vector) )
ff = 2 / (pw * blocksize * nfft)
assert nfsmooth > 2, "nfsmooth must be > 2"
assert (nfsmooth - 1) % 2 == 0, "please use odd number for nfsmooth"
t_mid = np.zeros((nb,))
x11 = np.zeros((nffth, nb))
x22 = np.zeros((nffth, nb))
x12 = np.zeros((nffth, nb), dtype=complex) # cross-spectrum
sx11 = np.copy(x11)
sx22 = np.copy(x22)
sx12 = np.copy(x12)
sc12 = np.copy(x11) # real-valued
for b in range(nb):
i1 = block_start[b]
i2 = i1 + blocksize
assert i2 <= n
t_mid[b] = (t[i1] + t[i2 - 1]) / 2
idxb = np.arange(i1, i2)
x1b_t = x1[idxb] - np.mean(x1[idxb])
x1b = np.fft.fft(w * x1b_t, n=nfft)[:nffth]
x2b_t = x2[idxb] - np.mean(x2[idxb])
x2b = np.fft.fft(w * x2b_t, n=nfft)[:nffth]
x11[:, b] = np.real(x1b * np.conj(x1b))
x22[:, b] = np.real(x2b * np.conj(x2b))
x12[:, b] = x1b * np.conj(x2b)
sx11[:, b] = smooth(x11[:, b], nfsmooth)
sx22[:, b] = smooth(x22[:, b], nfsmooth)
sx12[:, b] = smooth(x12[:, b], nfsmooth)
sc12[:, b] = np.real(sx12[:, b] * np.conj(sx12[:, b])) / sx11[:, b] / sx22[:, b]
# modespyec's relationship btw. "nsmooth" and "fsmooth"
df = 1.0 / (blocksize * ts)
fsmooth = 2 * df * ((nfsmooth - 1) / 2)
# modespyec's calc (verbatim) of the "c95" value (used as coherence threshold)
c95 = 1.96 / np.sqrt(2.0 * nfsmooth - 2.0)
c95 = (np.exp(c95) - np.exp(-c95)) / (np.exp(c95) + np.exp(-c95))
c95 **= 2
return {
"tmid": t_mid,
"freq": freq,
"X11": x11,
"X22": x22,
"X12": x12,
"SX11": sx11,
"SX22": sx22,
"SX12": sx12,
"SC12": sc12,
"deltat": blocksize * ts,
"deltaf": df,
"fsmooth": fsmooth,
"ff": ff,
"c95": c95,
}
def get_mode_map(
m: dict,
delta_theta: float,
coh_min: float,
no_value: float = np.nan,
p_frac: float = None,
q_high: float = 0.99,
) -> np.array:
nmap = -1 * (180 / np.pi) * np.angle(m["SX12"]) / delta_theta
nmap[m["SC12"] < coh_min] = no_value
if not p_frac is None:
assert 0.0 < p_frac <= 1.0
p = (m["X11"] + m["X22"]) / 2
p_high = np.quantile(p, q=q_high)
p_min = p_frac * p_high
nmap[p < p_min] = no_value
return nmap
def get_amplitude(
m: dict,
n_signed: list,
delta_theta: float,
coh_min: float = 0.98,
eps_int: float = 0.10,
) -> dict:
"""
M is the dict returned by the above function.
n_signed is a list of n-numbers for which to estimate RMS amplitudes
delta_theta is the angular distance between the probes that was used to calc. M
coh_min: minimum coherence
eps_int: tolerance for integerness
"""
def get_single_amplitude(n_target):
nb = m["tmid"].shape[0]
nf = m["X11"].shape[0]
c = 180.0 / np.pi
rms = np.zeros((nb,))
for b in range(nb):
w = np.zeros((nf,))
w[m["SC12"][:, b] >= coh_min] = 1.0
w[np.abs(-c * np.angle(m["SX12"][:, b]) / delta_theta - n_target) > eps_int] = 0.0
rms[b] = np.sqrt(m["ff"] * np.sum((m["X11"][:, b] * w + m["X22"][:, b] * w)) / 2)
return rms
a = {}
for n in n_signed:
a[n] = get_single_amplitude(n)
return a
def get_color(n: int) -> str:
"""
Get CSS color identifier string (working for matplotlib) corresponding
to modespyec's coloring scheme for n-numbers; return grey if n is out-of-range.
SEE: https://matplotlib.org/stable/gallery/color/named_colors.html
"""
modespyec_coloring = {
0: "black",
1: "red",
2: "yellow",
3: "green",
4: "blue",
5: "orange",
-1: "deepskyblue",
-2: "khaki",
-3: "violet",
-4: "cyan",
-5: "lightgreen",
}
return modespyec_coloring.get(n, "grey")
def get_window_weights(name: str, blocksize: int) -> np.array:
nvec = np.arange(blocksize)
l_name = name.lower()
if l_name == "hann":
return 0.5 * (1 - np.cos(2 * np.pi * nvec / blocksize))
if l_name == "hamming":
a1 = 0.5400
b1 = 1 - a1
return a1 - b1 * np.cos(2 * np.pi * nvec / blocksize)
if l_name == "bh":
a = 2 * np.pi * nvec / blocksize
return 0.35875 - 0.48829 * np.cos(a) + 0.14128 * np.cos(2.0 * a) - 0.01168 * np.cos(3.0 * a)
if l_name == "boxcar":
return np.ones((blocksize,))
raise NotImplementedError
def smooth(x: np.array, npts: int) -> np.array:
"""
Basic boxcar 1D signal smoothing
"""
assert len(x.shape) == 1
nh = (npts - 1) // 2
assert nh >= 1
y = np.convolve(x, np.ones((npts,)) / npts, mode="full")
return y[nh:-nh]
The result is a sophisticated 2D graph that works on any dataset including the appropriate probe data in 'data_folder' path, that is completely interoperable in the dFL workflow.