Skip to content

utils.py

Collect package helper functions.

add_grid_lines(buses, statistic)

Add a column with gridlines to a statistic.

Parameters:

Name Type Description Default
buses pandas.DataFrame

The Bus component data frame from a pypsa network.

required
statistic pandas.Series

A pandas object with a multiindex. There must be a "bus0" and a "bus1" multiindex level, that hold the node names.

required

Returns:

Type Description
pandas.DataFrame

A data frame with an additional "line" column that holds x/y coordinate pairs between the respective bus0 and bus1 locations.

Source code in evals/utils.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
def add_grid_lines(buses: pd.DataFrame, statistic: pd.Series) -> pd.DataFrame:
    """
    Add a column with gridlines to a statistic.

    Parameters
    ----------
    buses
        The Bus component data frame from a pypsa network.

    statistic
        A pandas object with a multiindex. There must be a "bus0" and
        a "bus1" multiindex level, that hold the node names.

    Returns
    -------
    :
        A data frame with an additional "line" column that holds x/y
        coordinate pairs between the respective bus0 and bus1 locations.
    """
    if isinstance(statistic, pd.Series):
        statistic = statistic.to_frame()

    bus0 = statistic.index.get_level_values("bus0").str.strip()
    bus1 = statistic.index.get_level_values("bus1").str.strip()
    ac_buses = filter_by(buses, carrier="AC")[["x", "y"]]

    def _get_bus_lines(_nodes: tuple[str]) -> np.ndarray:
        """
        Draw a line between buses using AC bus coordinates.

        Note, that only AC buses have coordinates assigned.

        Parameters
        ----------
        _nodes
            The start node name and the end node name in a tuple.

        Returns
        -------
        :
            A one dimensional array with lists of coordinate pairs,
            i.e. grid lines.
        """
        return ac_buses.loc[[*_nodes]][["y", "x"]].values.tolist()

    # generate lines [(x0, y0), (x1,y1)] between buses for every
    # row in grid and store it in a new column
    statistic["line"] = [*map(_get_bus_lines, zip(bus0, bus1, strict=True))]

    return statistic

align_edge_directions(df, lvl0='bus0', lvl1='bus1')

Align the directionality of edges between two nodes.

Parameters:

Name Type Description Default
df pandas.DataFrame

The input data frame with a multiindex.

required
lvl0 str

The first MultiIndex level name to swap values.

'bus0'
lvl1 str

The second MultiIndex level name to swap values.

'bus1'

Returns:

Type Description
pandas.DataFrame

The input data frame with aligned edge directions between the nodes in lvl1 and lvl0.

Source code in evals/utils.py
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
def align_edge_directions(
    df: pd.DataFrame, lvl0: str = "bus0", lvl1: str = "bus1"
) -> pd.DataFrame:
    """
    Align the directionality of edges between two nodes.

    Parameters
    ----------
    df
        The input data frame with a multiindex.
    lvl0
        The first MultiIndex level name to swap values.
    lvl1
        The second MultiIndex level name to swap values.

    Returns
    -------
    :
        The input data frame with aligned edge directions between the
        nodes in lvl1 and lvl0.
    """
    seen = []

    def _reverse_values_if_seen(df_slice: pd.DataFrame) -> pd.DataFrame:
        """
        Reverse index levels if they have a duplicated permutation.

        Parameters
        ----------
        df_slice
            A slice of a data frame with the bus0 and bus1 index level.

        Returns
        -------
        :
            The slice with exchanged level values if the combination of
            lvl1 and lvl2 is not unique and the original slice
            otherwise.
        """
        buses = {df_slice.index.unique(lvl0)[0], df_slice.index.unique(lvl1)[0]}
        if buses in seen:
            reversed_slice = df_slice.swaplevel(lvl0, lvl1)
            # keep original names since we only want to swap values
            reversed_slice.index.names = df_slice.index.names
            return reversed_slice
        else:
            seen.append(buses)
            return df_slice

    return df.groupby([lvl0, lvl1], group_keys=False).apply(
        _reverse_values_if_seen,
    )

apply_cutoff(df, limit, drop=True)

Replace small absolute values with NaN.

The limit boundary is not inclusive, i.e. the limit value itself will not be replaced by NaN.

Parameters:

Name Type Description Default
df pandas.DataFrame

The data frame to remove values from.

required
limit float

Absolute values smaller than the limit will be dropped.

required
drop bool

Whether to drop all NaN rows from the returned data frame.

True

Returns:

Type Description
pandas.DataFrame

A data frame without values that are smaller than the limit.

Source code in evals/utils.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def apply_cutoff(df: pd.DataFrame, limit: float, drop: bool = True) -> pd.DataFrame:
    """
    Replace small absolute values with NaN.

    The limit boundary is not inclusive, i.e. the limit value itself
    will not be replaced by NaN.

    Parameters
    ----------
    df
        The data frame to remove values from.
    limit
        Absolute values smaller than the limit will be dropped.
    drop
        Whether to drop all NaN rows from the returned data frame.

    Returns
    -------
    :
        A data frame without values that are smaller than the limit.
    """
    result = df.mask(cond=df.abs() < abs(limit), other=pd.NA)
    if drop:
        result = result.dropna(how="all", axis=0)
    return result

build_plot_config(global_cfg)

Build a plot configuration namespace from the TOML global config dict.

All values are read directly from global_cfg without fallback defaults. If a required key is missing, a :class:KeyError is raised immediately so misconfigurations surface loudly rather than silently producing incorrect output.

Complex values that cannot be expressed in TOML (chart class references, colour/pattern dicts, empty per-view dicts) are set here using Python constants. View-specific overrides (plotby, pivot_index, etc.) are applied in the individual view functions after the namespace is constructed.

Parameters:

Name Type Description Default
global_cfg dict

The [global] section of the merged TOML configuration, as returned by :func:~evals.fileio.read_views_config.

required

Returns:

Type Description
types.SimpleNamespace

A :class:~types.SimpleNamespace with the same attribute names as the former PlotConfig dataclass.

Raises:

Type Description
KeyError

If a required key is absent from global_cfg.

Source code in evals/utils.py
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
def build_plot_config(global_cfg: dict) -> SimpleNamespace:
    """
    Build a plot configuration namespace from the TOML global config dict.

    All values are read directly from *global_cfg* without fallback defaults.
    If a required key is missing, a :class:`KeyError` is raised immediately so
    misconfigurations surface loudly rather than silently producing incorrect
    output.

    Complex values that cannot be expressed in TOML (chart class references,
    colour/pattern dicts, empty per-view dicts) are set here using Python
    constants. View-specific overrides (``plotby``, ``pivot_index``, etc.) are
    applied in the individual view functions after the namespace is constructed.

    Parameters
    ----------
    global_cfg
        The ``[global]`` section of the merged TOML configuration, as
        returned by :func:`~evals.fileio.read_views_config`.

    Returns
    -------
    :
        A :class:`~types.SimpleNamespace` with the same attribute names as
        the former ``PlotConfig`` dataclass.

    Raises
    ------
    KeyError
        If a required key is absent from *global_cfg*.
    """
    _pattern_keys = [
        Group.import_foreign,
        Group.export_foreign,
        Group.import_domestic,
        Group.export_domestic,
        Group.import_net,
        Group.export_net,
        Group.import_global,
    ]

    return SimpleNamespace(
        # --- title & file naming (overwritten by Exporter per view) ---
        title=None,
        file_name_template=global_cfg["file_name_template"],
        unit="",  # default is metric.df.attrs["unit"] at render time
        # --- database upload attributes (overwritten by Exporter per view) ---
        database_plot_type="",
        database_specifier="",
        database_bus_carrier="",
        # --- chart class (resolved to a class by Exporter.export()) ---
        chart=None,
        # --- data model / pivot defaults (overwritten in view code per view) ---
        plotby=[DataModel.LOCATION],
        pivot_index=list(DataModel.YEAR_IDX_NAMES),
        pivot_columns=[],
        plot_category=DataModel.CARRIER,
        plot_xaxis=DataModel.YEAR,
        facet_column=DataModel.BUS_CARRIER,
        # --- view-level overrides set per-view (empty by default) ---
        category_orders=(),
        fill={},
        line_dash={},
        line_width={},
        # --- complex defaults from Python constants ---
        colors=dict(COLOUR_SCHEME),
        pattern=dict.fromkeys(_pattern_keys, "/"),
        # --- scalar / boolean defaults sourced from TOML [global] ---
        stacked=global_cfg["stacked"],
        line_shape=global_cfg["line_shape"],
        legend_header=global_cfg["legend_header"],
        xaxis_title=global_cfg["xaxis_title"],
        yaxis_color=global_cfg["yaxis_color"],
        footnotes=tuple(global_cfg["footnotes"]),
        cutoff=global_cfg.get(
            "cutoff", 0.0001
        ),  # overwritten per-view; toml has no view-level default
        cutoff_drop=global_cfg["cutoff_drop"],
        legend_font_size=global_cfg["legend_font_size"],
        title_font_size=global_cfg["title_font_size"],
        font_size=global_cfg["font_size"],
        xaxis_font_size=global_cfg["xaxis_font_size"],
        yaxes_showgrid=global_cfg["yaxes_showgrid"],
        yaxes_visible=global_cfg["yaxes_visible"],
    )

calculate_input_share(df, bus_carrier, apply_scaling=True)

Calculate the withdrawal necessary to supply energy for requested bus_carrier.

Each technology's demand rows are weighted by the output share that lands on the requested bus_carrier. An optional input/output scaling step converts those input-side magnitudes into the equivalent output-side magnitudes; see apply_scaling below.

Parameters:

Name Type Description Default
df pandas.DataFrame | pandas.Series

The input DataFrame or Series with a MultiIndex.

required
bus_carrier str | list

Calculates the input energy for this bus_carrier.

required
apply_scaling bool

Whether to rescale each demand row by the technology's total_output / total_input ratio (default True, preserving the legacy behaviour).

  • True: the result is expressed in output magnitudes. For a fuel-to-power link this yields the electricity actually produced from the fuel (elec_output). Heat-pump-like links (where output exceeds input) get a virtual ambient heat / latent heat surplus row so that input + surplus matches output.
  • False: the scaling factor is skipped and the result is expressed in input magnitudes. For a fuel-to-power link this yields the fuel input attributable to electricity output (fuel × electricity_fraction). The heat-pump surplus branch is irrelevant in this mode and is therefore skipped.
True

Returns:

Type Description
pandas.DataFrame | pandas.Series

The withdrawal amounts necessary to produce energy of bus_carrier, either in output-side magnitudes (apply_scaling=True) or in input-side magnitudes (apply_scaling=False).

Source code in evals/utils.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def calculate_input_share(
    df: pd.DataFrame | pd.Series,
    bus_carrier: str | list,
    apply_scaling: bool = True,
) -> pd.DataFrame | pd.Series:
    """
    Calculate the withdrawal necessary to supply energy for requested bus_carrier.

    Each technology's demand rows are weighted by the output share that lands
    on the requested ``bus_carrier``.  An optional input/output scaling step
    converts those input-side magnitudes into the equivalent output-side
    magnitudes; see *apply_scaling* below.

    Parameters
    ----------
    df
        The input DataFrame or Series with a MultiIndex.
    bus_carrier
        Calculates the input energy for this bus_carrier.
    apply_scaling
        Whether to rescale each demand row by the technology's
        ``total_output / total_input`` ratio (default ``True``, preserving
        the legacy behaviour).

        - ``True``: the result is expressed in *output* magnitudes. For a
          fuel-to-power link this yields the electricity actually produced
          from the fuel (``elec_output``).  Heat-pump-like links (where
          output exceeds input) get a virtual ``ambient heat`` /
          ``latent heat`` surplus row so that input + surplus matches
          output.
        - ``False``: the scaling factor is skipped and the result is
          expressed in *input* magnitudes. For a fuel-to-power link this
          yields the fuel input attributable to electricity output
          (``fuel × electricity_fraction``).  The heat-pump surplus branch
          is irrelevant in this mode and is therefore skipped.

    Returns
    -------
    :
        The withdrawal amounts necessary to produce energy of `bus_carrier`,
        either in output-side magnitudes (``apply_scaling=True``) or in
        input-side magnitudes (``apply_scaling=False``).
    """

    def _input_share(_df):
        demand = _df[_df.lt(0)]
        supply = _df[_df.ge(0)]
        bus_carrier_supply = filter_by(supply, bus_carrier=bus_carrier).sum()
        # share takes multiple outputs into account
        with np.errstate(divide="ignore", invalid="ignore"):  # silently divide by zero
            share = bus_carrier_supply / supply.sum()
        if not apply_scaling:
            # Input-side magnitudes: skip the input/output scaling so the
            # result reflects ``demand × output_share`` (e.g. fuel input
            # attributable to electricity output).
            return demand * share
        # scaling takes into account that Link inputs and outputs are not equally large
        scaling = abs(supply.sum() / demand.sum())
        if scaling > 1.0:
            _carrier = _df.index.unique(DataModel.CARRIER).item()
            _bus_carrier = "ambient heat" if "heat pump" in _carrier else "latent heat"
            surplus = rename_aggregate(
                demand * (scaling - 1), _bus_carrier, level=DataModel.BUS_CARRIER
            )
            return pd.concat([demand, surplus]) * share
        else:
            return demand * scaling * share

    groups = [s for s in df.index.names if s != "bus_carrier"]
    return df.groupby(groups, group_keys=False).apply(_input_share).mul(-1)

combine_statistics(statistics, metric_name, is_unit, to_unit, keep_regions=('AT', 'GB', 'ES', 'FR', 'DE', 'IT'), region_nice_names=True)

Build the metric data frame from statistics.

Parameters:

Name Type Description Default
statistics list

The statistics to combine.

required
metric_name str

The metric name used in plot titles and column labels.

required
is_unit str

The common unit of input statistics.

required
to_unit str

The desired unit of the output metric.

required
keep_regions tuple

A collection of country codes for which original input cluster codes will be included in the metric locations.

('AT', 'GB', 'ES', 'FR', 'DE', 'IT')
region_nice_names bool

Whether to replace location country codes with country/region names.

True

Returns:

Type Description
pandas.DataFrame

The formatted metric in the desired unit and locations.

Source code in evals/utils.py
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
def combine_statistics(
    statistics: list,
    metric_name: str,
    is_unit: str,
    to_unit: str,
    keep_regions: tuple = ("AT", "GB", "ES", "FR", "DE", "IT"),
    region_nice_names: bool = True,
) -> pd.DataFrame:
    """
    Build the metric data frame from statistics.

    Parameters
    ----------
    statistics
        The statistics to combine.
    metric_name
        The metric name used in plot titles and column labels.
    is_unit
        The common unit of input statistics.
    to_unit
        The desired unit of the output metric.
    keep_regions
        A collection of country codes for which original input
        cluster codes will be included in the metric locations.
    region_nice_names
        Whether to replace location country codes with country/region
        names.

    Returns
    -------
    :
        The formatted metric in the desired unit and locations.
    """
    df = pd.concat(statistics)

    if was_series := isinstance(df, pd.Series):
        df = df.to_frame(f"{metric_name} ({is_unit})")

    df = _aggregate_locations(df, keep_regions, region_nice_names)

    df.attrs["name"] = metric_name
    df.attrs["unit"] = to_unit

    df.columns.name = DataModel.METRIC if was_series else DataModel.SNAPSHOTS
    if df.columns.name == DataModel.SNAPSHOTS:
        df.columns = pd.to_datetime(df.columns, errors="raise")

    if to_unit and (is_unit != to_unit):
        df = scale(df, to_unit=to_unit)

    df = _split_trade_saldo_to_netted_import_export(df)

    return df

custom_sort(df, by, values, ascending=False)

Sort a data frame by the first appearance in values.

Parameters:

Name Type Description Default
df pandas.DataFrame

The dataframe to sort.

required
by str

The column name to find values in.

required
values tuple

The values to sort by. The order in this collection defines the sort result.

required
ascending bool

Whether to reverse the result (Plotly inserts legend items from top down).

False

Returns:

Type Description
pandas.DataFrame

The sorted data frame.

Source code in evals/utils.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
def custom_sort(
    df: pd.DataFrame, by: str, values: tuple, ascending: bool = False
) -> pd.DataFrame:
    """
    Sort a data frame by the first appearance in *values*.

    Parameters
    ----------
    df
        The dataframe to sort.
    by
        The column name to find values in.
    values
        The values to sort by.  The order in this collection defines
        the sort result.
    ascending
        Whether to reverse the result (Plotly inserts legend items from
        top down).

    Returns
    -------
    :
        The sorted data frame.
    """
    if not values:
        return df

    def _custom_order(ser: pd.Series) -> pd.Series:
        order = {s: i for i, s in enumerate(values)}
        return ser.apply(lambda x: order.get(x, 1000))

    return df.sort_values(by=by, key=_custom_order, ascending=ascending)

drop_from_multtindex_by_regex(df, pattern, level=DataModel.CARRIER)

Drop all rows that match the regex in the index level.

This function is needed, because pandas.DataFrame.filter cannot be applied to MultiIndexes.

Parameters:

Name Type Description Default
df pandas.DataFrame

The input data frame with a multi index.

required
pattern str

The regular expression pattern as a raw string.

required
level str

The multi index level to match the regex to.

evals.constants.DataModel.CARRIER

Returns:

Type Description
pandas.DataFrame | pandas.Series

The input data where the regular expression does not match.

Source code in evals/utils.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
def drop_from_multtindex_by_regex(
    df: pd.DataFrame, pattern: str, level: str = DataModel.CARRIER
) -> pd.DataFrame | pd.Series:
    """
    Drop all rows that match the regex in the index level.

    This function is needed, because pandas.DataFrame.filter cannot
    be applied to MultiIndexes.

    Parameters
    ----------
    df
        The input data frame with a multi index.
    pattern
        The regular expression pattern as a raw string.
    level
        The multi index level to match the regex to.

    Returns
    -------
    :
        The input data where the regular expression does not match.
    """
    if not pattern:
        return df

    mask = df.index.get_level_values(level).str.contains(pattern, regex=True)
    return df[~mask]

filter_by(df, exclude=False, **kwargs)

Filter a data frame by key value pairs.

Constructs a pandas query using the pandas.Index.isin() method. Since the pandas query API is only available for data frames, any passed pandas Series is converted to frame and reset to series.

Parameters:

Name Type Description Default
df pandas.DataFrame | pandas.Series

The data frame or Series to filter.

required
exclude bool

Set to True to exclude the filter result from the original data set, and return the difference.

False
**kwargs object

Key=value pairs, used in the filter expression. Valid keys are index level names or column labels.

{}

Returns:

Type Description
pandas.DataFrame | pandas.Series

The filtered data frame in the same format as the input dataframe.

Source code in evals/utils.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def filter_by(
    df: pd.DataFrame | pd.Series, exclude: bool = False, **kwargs: object
) -> pd.DataFrame | pd.Series:
    """
    Filter a data frame by key value pairs.

    Constructs a pandas query using the pandas.Index.isin() method.
    Since the pandas query API is only available for data frames,
    any passed pandas Series is converted to frame and reset to
    series.

    Parameters
    ----------
    df
        The data frame or Series to filter.
    exclude
        Set to True to exclude the filter result from the original
        data set, and return the difference.
    **kwargs
        Key=value pairs, used in the filter expression. Valid keys are
        index level names or column labels.

    Returns
    -------
    :
        The filtered data frame in the same format as the input
        dataframe.
    """
    if df.empty:
        return df  # to prevent key errors

    if was_series := isinstance(df, pd.Series):
        df = df.to_frame()

    where_clauses = []
    for key, vals in kwargs.items():
        vals = [vals] if np.isscalar(vals) else vals
        where_clauses.append(f"{key} in {vals}")

    expression = " & ".join(where_clauses)
    result = df.query(expression)

    if exclude:
        result = df.drop(result.index)

    # squeeze(axis=1) to preserve index even for single rows
    return result.squeeze(axis=1) if was_series else result

filter_for_carrier_connected_to(df, bus_carrier)

Return a subset with technologies connected to a bus carrier.

Parameters:

Name Type Description Default
df pandas.DataFrame

The input DataFrame or Series with a MultiIndex.

required
bus_carrier str | list

The bus carrier to filter for.

required

Returns:

Type Description

A subset of the input data that contains all location + carrier combinations that have at least one connection to the requested bus_carrier.

Source code in evals/utils.py
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
def filter_for_carrier_connected_to(df: pd.DataFrame, bus_carrier: str | list):
    """
    Return a subset with technologies connected to a bus carrier.

    Parameters
    ----------
    df
        The input DataFrame or Series with a MultiIndex.
    bus_carrier
        The bus carrier to filter for.

    Returns
    -------
    :
        A subset of the input data that contains all location + carrier
        combinations that have at least one connection to the requested
        bus_carrier.
    """
    carrier_connected_to_bus_carrier = []
    locations_connected_to_bus_carrier = []

    # hotfix to support country groupers
    location_or_country = DataModel.LOCATION
    if "country" in df.index.names:
        location_or_country = "country"

    for (loc, carrier), data in df.groupby([location_or_country, DataModel.CARRIER]):
        if filter_by(data, bus_carrier=bus_carrier).any():
            carrier_connected_to_bus_carrier.append(carrier)
            locations_connected_to_bus_carrier.append(loc)

    kwargs = {
        "carrier": carrier_connected_to_bus_carrier,
        location_or_country: locations_connected_to_bus_carrier,
    }

    return filter_by(df, **kwargs)

get_energy_totals_domestic_share(energy_totals, kind)

Return the domestic share of energy totals for a given kind.

Parameters:

Name Type Description Default
energy_totals pandas.DataFrame

The energy totals data frame filtered to one energy year.

required
kind str

The kind of energy totals to calculate the factor for.

required

Returns:

Type Description
pandas.Series

The share of national aviation or navigation per country.

Source code in evals/utils.py
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
def get_energy_totals_domestic_share(
    energy_totals: pd.DataFrame, kind: str
) -> pd.Series:
    """
    Return the domestic share of energy totals for a given kind.

    Parameters
    ----------
    energy_totals
        The energy totals data frame filtered to one energy year.
    kind: {'aviation', 'navigation'}
        The kind of energy totals to calculate the factor for.

    Returns
    -------
    :
        The share of national aviation or navigation per country.
    """
    domestic = energy_totals[f"total domestic {kind}"]
    international = energy_totals[f"total international {kind}"]
    return domestic / (domestic + international)

get_heat_loss_factor(nc)

Return the heat loss factor for district heating from the config.

Parameters:

Name Type Description Default
nc pypsa.NetworkCollection

The loaded networks.

required

Returns:

Type Description
The heat loss factor for district heating networks.
Source code in evals/utils.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def get_heat_loss_factor(nc: NetworkCollection) -> int:
    """
    Return the heat loss factor for district heating from the config.

    Parameters
    ----------
    nc
        The loaded networks.

    Returns
    -------
    The heat loss factor for district heating networks.
    """
    heat_loss_factors = {
        n.meta["sector"]["district_heating"]["district_heating_loss"] for n in nc
    }
    assert len(heat_loss_factors) == 1, "Varying loss factors are not supported."
    return heat_loss_factors.pop()

get_latest_results_folder()

Find the results folder with the latest file system timestamp.

Source code in evals/utils.py
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
def get_latest_results_folder() -> Path:
    """Find the results folder with the latest file system timestamp."""
    results_root = Path("results")
    scenario_dirs = [
        scenario
        for prefix in results_root.iterdir()
        if prefix.is_dir()
        for scenario in prefix.iterdir()
        if scenario.is_dir()
    ]
    if not scenario_dirs:
        raise FileNotFoundError(
            f"No scenario directories found under {results_root.resolve()}"
        )

    # return largest system timestamp folder
    return max(scenario_dirs, key=lambda p: p.stat().st_mtime)

get_location_alias(locations)

Return the location alias mapping depending on the clustering.

Constructs a mapping dictionary from location codes to human-readable names based on the detected clustering configuration. Automatically detects DE5/16 and AT10/35 clustering levels by counting the number of regional locations in the index.

Parameters:

Name Type Description Default
locations pandas.Index

Index containing location codes (e.g., 'DE1', 'AT211', 'EU').

required

Returns:

Type Description
dict

Dictionary mapping location codes to human-readable names. Includes country, region, and clustering-specific aliases.

Raises:

Type Description
ValueError

If the number of DE or AT regions doesn't match expected clustering configurations (DE5/16 or AT10/35).

Source code in evals/utils.py
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
def get_location_alias(locations: pd.Index) -> dict:
    """
    Return the location alias mapping depending on the clustering.

    Constructs a mapping dictionary from location codes to human-readable
    names based on the detected clustering configuration. Automatically
    detects DE5/16 and AT10/35 clustering levels by counting the
    number of regional locations in the index.

    Parameters
    ----------
    locations
        Index containing location codes (e.g., 'DE1', 'AT211', 'EU').

    Returns
    -------
    :
        Dictionary mapping location codes to human-readable names.
        Includes country, region, and clustering-specific aliases.

    Raises
    ------
    ValueError
        If the number of DE or AT regions doesn't match expected
        clustering configurations (DE5/16 or AT10/35).
    """
    de_regions = [loc for loc in locations if loc.startswith("DE")]
    if len(de_regions) == 6:  # DE5 clustering + Germany
        alias = ALIAS_COUNTRY | ALIAS_REGION | ALIAS_REGION_DE5_CLUSTERING
    elif len(de_regions) == 17:  # 16 Bundesländer + Germany
        alias = ALIAS_COUNTRY | ALIAS_REGION | ALIAS_REGION_DE16_CLUSTERING
    else:
        logger.warning(f"Unexpected number of locations for DE: {len(de_regions)}.")
        alias = ALIAS_COUNTRY

    at_regions = [loc for loc in locations if loc.startswith("AT")]
    if len(at_regions) == 11:  # AT10 + Austria
        alias = alias | ALIAS_REGION_AT10_CLUSTERING
    elif len(at_regions) == 36:  # AT35 + Austria
        alias = alias | ALIAS_REGION_AT35_CLUSTERING
    else:
        logger.warning(f"Unexpected number of locations for AT: {len(at_regions)}.")

    return frozendict(alias)

get_storage_carriers(nc)

Get the storage carriers from the networks.

Parameters:

Name Type Description Default
nc pypsa.NetworkCollection

The loaded networks.

required

Returns:

Type Description
list[str]

A list of storage carrier names.

Source code in evals/utils.py
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
def get_storage_carriers(nc: NetworkCollection) -> list[str]:
    """
    Get the storage carriers from the networks.

    Parameters
    ----------
    nc
        The loaded networks.

    Returns
    -------
    :
        A list of storage carrier names.
    """
    storage_carriers = set()
    for n, c in product(nc, ("Store", "StorageUnit")):
        storage_carriers = storage_carriers.union(n.static(c)["carrier"].unique())

    return sorted(storage_carriers)

get_trade_type(bus_a, bus_b)

Determine the trade type between two buses.

Parameters:

Name Type Description Default
bus_a str

1st string that should start with a region substring.

required
bus_b str

2nd string that should start with a region substring.

required

Returns:

Type Description
str

The trade type. One of constants.TRADE_TYPES.

Source code in evals/utils.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def get_trade_type(bus_a: str, bus_b: str) -> str:
    """
    Determine the trade type between two buses.

    Parameters
    ----------
    bus_a
        1st string that should start with a region substring.
    bus_b
        2nd string that should start with a region substring.

    Returns
    -------
    :
        The trade type. One of constants.TRADE_TYPES.
    """
    loc_a = re.findall(Regex.region, bus_a)[:1]
    loc_b = re.findall(Regex.region, bus_b)[:1]
    if not loc_a or not loc_b:  # no region(s) found
        return ""
    elif loc_a[0] == loc_b[0]:
        # transformation link in same region, e.g. heat
        return TradeTypes.LOCAL
    elif loc_a[0][:2] == loc_b[0][:2]:  # country codes match
        return TradeTypes.DOMESTIC
    else:
        return TradeTypes.FOREIGN

get_transmission_techs(nc, bus_carrier=None)

Get the transmission technologies from the networks.

Parameters:

Name Type Description Default
nc pypsa.NetworkCollection

The loaded networks.

required
bus_carrier str | list

The bus carrier to filter for.

None

Returns:

Type Description
list[str]

A list of transmission technology names.

Source code in evals/utils.py
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
def get_transmission_techs(
    nc: NetworkCollection, bus_carrier: str | list = None
) -> list[str]:
    """
    Get the transmission technologies from the networks.

    Parameters
    ----------
    nc
        The loaded networks.
    bus_carrier
        The bus carrier to filter for.

    Returns
    -------
    :
        A list of transmission technology names.
    """
    transmission_techs = set()
    for n in nc:
        transmission_techs = transmission_techs.union(
            get_transmission_carriers(n, bus_carrier)
        )

    return sorted(transmission_techs)

get_unit(s, ignore_suffix=True)

Parse the unit from a string.

The unit must be inside round parentheses. If multiple parenthesis are found in the input string, returns the last one.

Parameters:

Name Type Description Default
s str

The input string that should contain a unit.

required
ignore_suffix bool

Whether to strip the suffix, e.g. _th, _el, _LHV, ...

True

Returns:

Type Description
str

All characters inside the last pair of parenthesis without the enclosing parenthesis, or an empty string.

Source code in evals/utils.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_unit(s: str, ignore_suffix: bool = True) -> str:
    """
    Parse the unit from a string.

    The unit must be inside round parentheses. If multiple
    parenthesis are found in the input string, returns the last one.

    Parameters
    ----------
    s
        The input string that should contain a unit.
    ignore_suffix
        Whether to strip the suffix, e.g. `_th`, `_el`, `_LHV`, ...

    Returns
    -------
    :
        All characters inside the last pair of parenthesis without
        the enclosing parenthesis, or an empty string.
    """
    if matches := re.findall(Regex.unit, s):
        unit = matches[-1].strip("()")
        if ignore_suffix and "_" in unit:
            return "_".join(unit.split("_")[:-1])
        else:
            return matches[-1].strip("()")
    return ""

insert_index_level(df, value, index_name, axis=0, pos=0)

Add an index level to the data frame.

Parameters:

Name Type Description Default
df pandas.DataFrame | pandas.Series

The data frame that will receive the new outer level index.

required
value str

The new index values.

required
index_name str

The new index level name.

required
axis optional

The index axis. Pass 0 for row index and 1 for column index.

0
pos optional

Move the new index name to this position. 0 is outer left, 1 is the second, and so on.

0

Returns:

Type Description
pandas.DataFrame | pandas.Series

The data frame with the new index level.

Source code in evals/utils.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def insert_index_level(
    df: pd.DataFrame | pd.Series,
    value: str,
    index_name: str,
    axis: int = 0,
    pos: int = 0,
) -> pd.DataFrame | pd.Series:
    """
    Add an index level to the data frame.

    Parameters
    ----------
    df
        The data frame that will receive the new outer level index.
    value
        The new index values.
    index_name
        The new index level name.
    axis : optional
        The index axis. Pass 0 for row index and 1 for column index.
    pos : optional
        Move the new index name to this position. 0 is outer left,
        1 is the second, and so on.

    Returns
    -------
    :
        The data frame with the new index level.
    """
    result = pd.concat({value: df}, names=[index_name], axis=axis)
    if pos == 0:  # no need to reorder levels. We are done inserting.
        return result
    idx = df.index if axis == 0 else df.columns
    idx_names = list(idx.names)
    idx_names.insert(pos, index_name)
    if isinstance(result, pd.DataFrame):
        return result.reorder_levels(idx_names, axis=axis)
    return result.reorder_levels(idx_names)

prettify_number(x)

Format a float for display on trace hover actions.

Parameters:

Name Type Description Default
x float

The imprecise value to format.

required

Returns:

Type Description
str

The formatted number as a string with 1 or 0 decimal places, depending on the magnitude of the input value.

Source code in evals/utils.py
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
def prettify_number(x: float) -> str:
    """
    Format a float for display on trace hover actions.

    Parameters
    ----------
    x
        The imprecise value to format.

    Returns
    -------
    :
        The formatted number as a string with 1 or 0 decimal places,
        depending on the magnitude of the input value.
    """
    if abs(x) >= 10:
        return f"{int(round(x, 0)):d}"
    else:
        return f"{round(x, 1):.1f}"

regionalize_statistics(supply, demand, bus_carrier)

Calculate regional balances for specific carriers.

Computes regional import/export balances by comparing supply and demand for specific bus carriers (e.g., oil, coal, lignite, NH3) across locations.

Parameters:

Name Type Description Default
supply pandas.Series

Supply statistics series.

required
demand pandas.DataFrame

Demand statistics series.

required
bus_carrier str | list

Bus carrier name(s) to analyze for regional trade.

required

Returns:

Type Description
pandas.Series

List containing regional import and export series. Imports are negative balances (deficit), exports are positive (surplus).

Source code in evals/utils.py
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
def regionalize_statistics(
    supply: pd.Series, demand: pd.DataFrame, bus_carrier: str | list
) -> pd.Series:
    """
    Calculate regional balances for specific carriers.

    Computes regional import/export balances by comparing supply and demand
    for specific bus carriers (e.g., oil, coal, lignite, NH3) across locations.

    Parameters
    ----------
    supply
        Supply statistics series.
    demand
        Demand statistics series.
    bus_carrier
        Bus carrier name(s) to analyze for regional trade.

    Returns
    -------
    :
        List containing regional import and export series.
        Imports are negative balances (deficit), exports are positive (surplus).
    """
    year_loc = [DataModel.YEAR, DataModel.LOCATION]
    regional_supply = filter_by(supply, bus_carrier=bus_carrier).groupby(year_loc).sum()
    regional_demand = filter_by(demand, bus_carrier=bus_carrier).groupby(year_loc).sum()
    regional_balance = (
        regional_supply.add(regional_demand, fill_value=0)
        .pipe(insert_index_level, "Link", DataModel.COMPONENT, pos=1)
        .pipe(insert_index_level, bus_carrier, DataModel.BUS_CARRIER, pos=3)
        .pipe(insert_index_level, "trade", DataModel.CARRIER, pos=3)
        .drop("EU", level=DataModel.LOCATION, errors="ignore")
    )
    regional_import = rename_aggregate(
        regional_balance[regional_balance.le(0)], {"trade": "Global Import"}
    ).mul(-1)
    regional_export = rename_aggregate(
        regional_balance[regional_balance.gt(0)], {"trade": "Global Export"}
    ).mul(-1)

    return pd.concat([regional_import, regional_export])

rename_aggregate(df, mapper, level=DataModel.CARRIER, agg='sum')

Rename index values and aggregate duplicates.

In case the supplied mapper is a string, all values in the supplied level are replaced by this string.

Parameters:

Name Type Description Default
df pandas.DataFrame | pandas.Series

The input data frame.

required
mapper dict | str

A Dictionary with key-value pairs to rename index values, or a string used to replace all values in the given level.

required
level str

The index level name.

evals.constants.DataModel.CARRIER
agg str

The aggregation method for duplicated index values after renaming.

'sum'

Returns:

Type Description
pandas.Series | pandas.DataFrame

A data frame with renamed index values and aggregated values.

Notes

Support for column axis mapping was removed, because the groupby operation along axis=1 removes column level names and does not work correctly.

Source code in evals/utils.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def rename_aggregate(
    df: pd.DataFrame | pd.Series,
    mapper: dict | str,
    level: str = DataModel.CARRIER,
    agg: str = "sum",
) -> pd.Series | pd.DataFrame:
    """
    Rename index values and aggregate duplicates.

    In case the supplied mapper is a string, all values in the
    supplied level are replaced by this string.

    Parameters
    ----------
    df
        The input data frame.
    mapper
        A Dictionary with key-value pairs to rename index values, or
        a string used to replace all values in the given level.
    level
        The index level name.
    agg
        The aggregation method for duplicated index values after
        renaming.

    Returns
    -------
    :
        A data frame with renamed index values and aggregated values.

    Notes
    -----
    Support for column axis mapping was removed, because the groupby
    operation along axis=1 removes column level names and does not
    work correctly.
    """
    if isinstance(mapper, str):
        mapper = dict.fromkeys(df.index.unique(level=level), mapper)
    renamed = df.rename(mapper, level=level)
    return renamed.groupby(df.index.names).agg(agg)

scale(df, to_unit)

Scale metric values to the specified target unit.

Multiplies all columns in the metric by a scaling factor. The scaling factor is calculated from the unit in the data frame columns and the given target unit. Also updates the unit names encoded in the data frame columns for time aggregated metrics.

Parameters:

Name Type Description Default
df pandas.DataFrame

The input data frame with valid units in the column labels.

required
to_unit str

The target unit. See constants.UNITS for possible units.

required

Returns:

Type Description
pandas.DataFrame

The scaled data frame with replaced units in column labels.

Raises:

Type Description
raises KeyError

If the 'to_unit' is not found in UNITS, or if the attrs dictionary has no unit field.

raises ValueError

If input units are inconsistent, i.e. mixed power and energy columns.

Source code in evals/utils.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def scale(df: pd.DataFrame, to_unit: str) -> pd.DataFrame:
    """
    Scale metric values to the specified target unit.

    Multiplies all columns in the metric by a scaling factor.
    The scaling factor is calculated from the unit in the data frame
    columns and the given target unit. Also updates the unit
    names encoded in the data frame columns for time aggregated
    metrics.

    Parameters
    ----------
    df
        The input data frame with valid units in the column labels.
    to_unit
        The target unit. See constants.UNITS for possible
        units.

    Returns
    -------
    :
        The scaled data frame with replaced units in column labels.

    Raises
    ------
    raises KeyError
        If the 'to_unit' is not found in UNITS, or if the attrs
        dictionary has no unit field.
    raises ValueError
        If input units are inconsistent, i.e. mixed power and energy
        columns.
    """
    suffix = ""
    if to_unit.endswith(("_LHV", "_th", "_el")):
        to_unit, suffix = to_unit.split("_")

    if df.columns.name == DataModel.SNAPSHOTS:
        is_unit = df.attrs["unit"]
        scaling_factor = is_unit / to_unit
        result = df.mul(scaling_factor)
    else:
        scale_to = to_unit if isinstance(to_unit, float) else UNITS[to_unit]
        units_in = list(map(get_unit, df.columns))
        if to_unit.endswith("h") and not all(u.endswith("h") for u in units_in):
            raise ValueError("Denying to convert units from power to energy.")
        if to_unit.endswith("W") and not all(u.endswith("W") for u in units_in):
            raise ValueError("Denying to convert unit from energy to power.")
        scale_in = [UNITS[s] for s in units_in]
        scaling_factors = [x / scale_to for x in scale_in]

        result = df.mul(scaling_factors, axis=1)
        result.columns = result.columns.str.replace(
            "|".join(units_in), to_unit, regex=True
        )

    if suffix:
        result.attrs["unit"] = f"{to_unit}_{suffix}"
    else:
        result.attrs["unit"] = to_unit

    return result

split_location_carrier(index, names)

Split location and carrier in the index.

The location must be encoded in the string and match the regex '^[A-Z]{2}\d\s\d'. Subsequent characters become the carrier name. The location defaults to an emtpy string if the regex does not match.

Parameters:

Name Type Description Default
index pandas.MultiIndex

A pandas Multiindex with the innermost level to split.

required
names list

The list of output Multiindex names.

required

Returns:

Type Description
pandas.MultiIndex

The resulting Multiindex with one additional level due to the splitting.

Source code in evals/utils.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def split_location_carrier(index: pd.MultiIndex, names: list) -> pd.MultiIndex:
    r"""
    Split location and carrier in the index.

    The location must be encoded in the string and match the regex
    '^[A-Z]{2}\\d\\s\\d'. Subsequent characters become the carrier
    name. The location defaults to an emtpy string if the regex
    does not match.

    Parameters
    ----------
    index
        A pandas Multiindex with the innermost level to split.
    names
        The list of output Multiindex names.

    Returns
    -------
    :
        The resulting Multiindex with one additional
        level due to the splitting.
    """
    idx_split = []
    for *prefixes, loc_category in index:
        matches = re.match(Regex.region, loc_category)
        location = matches.group().strip() if matches else ""
        technology = loc_category.removeprefix(location).strip()
        idx_split.append((*prefixes, location, technology))

    return pd.MultiIndex.from_tuples(idx_split, names=names)

split_urban_central_heat_losses_and_consumption(df, heat_loss)

Split urban heat amounts by a heat loss factor.

Amounts for urban central heat contain distribution losses. However, the evaluation shows final demands in the results. Therefore, heat network distribution losses need to be separated from the total amounts because grid distribution losses do not arrive at the metering endpoint.

Parameters:

Name Type Description Default
df pandas.DataFrame | pandas.Series

The input data frame with values for urban central heat technologies.

required
heat_loss int

The heat loss factor from the configuration file.

required

Returns:

Type Description
pandas.DataFrame

The data frame with split heat amounts for end user demand (urban dentral heat), distribution grid losses (urban dentral heat losses) and anything else from the input data frame (not urban central heat).

Source code in evals/utils.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def split_urban_central_heat_losses_and_consumption(
    df: pd.DataFrame | pd.Series, heat_loss: int
) -> pd.DataFrame:
    """
    Split urban heat amounts by a heat loss factor.

    Amounts for urban central heat contain distribution losses.
    However, the evaluation shows final demands
    in the results. Therefore, heat network distribution losses need
    to be separated from the total amounts because grid distribution
    losses do not arrive at the metering endpoint.

    Parameters
    ----------
    df
        The input data frame with values for urban central heat
        technologies.
    heat_loss
        The heat loss factor from the configuration file.

    Returns
    -------
    :
        The data frame with split heat amounts for end user demand
        (urban dentral heat), distribution grid losses (urban dentral
        heat losses) and anything else from the input data frame
        (not urban central heat).
    """
    loss_factor = heat_loss / (1 + heat_loss)
    urban_heat_bus_carrier = [BusCarrier.HEAT_URBAN_CENTRAL]

    urban_heat = filter_by(df, bus_carrier=urban_heat_bus_carrier)
    rest = filter_by(df, bus_carrier=urban_heat_bus_carrier, exclude=True)
    consumption = urban_heat.mul(1 - loss_factor)
    losses = urban_heat.mul(loss_factor)
    losses_mapper = dict.fromkeys(urban_heat_bus_carrier, "urban central heat losses")
    losses = losses.rename(losses_mapper, level=DataModel.CARRIER)

    return pd.concat([rest, consumption, losses]).sort_index()

trade_mask(comp, scopes, buses=('bus0', 'bus1'))

Get the mask for a given trade type.

The logic only compares bus0 and bus1 in a given component.

Parameters:

Name Type Description Default
comp pandas.DataFrame

The component data frame. Should be one a branch_component, i.e. 'Line', 'Link', or 'Transformer'.

required
scopes str | tuple

The trade scope(s) to match. One or multiple of 'local', 'domestic', 'foreign'.

required
buses tuple

Two buses to determine the trade type from. The trade type will be 'local', 'domestic', or 'foreign', for same location, same country code, or different country code, respectively.

('bus0', 'bus1')

Returns:

Type Description
pandas.Series

A pandas Series with the same index as component index and 1 or 0 as values for match or differ, respectively.

Raises:

Type Description
ValueError

In case the passed trade type is not supported and to prevent unintended string matches.

Source code in evals/utils.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def trade_mask(
    comp: pd.DataFrame, scopes: str | tuple, buses: tuple = ("bus0", "bus1")
) -> pd.Series:
    """
    Get the mask for a given trade type.

    The logic only compares bus0 and bus1 in a given component.

    Parameters
    ----------
    comp
        The component data frame. Should be one a branch_component,
        i.e. 'Line', 'Link', or 'Transformer'.
    scopes
        The trade scope(s) to match. One or multiple of 'local',
        'domestic', 'foreign'.
    buses
        Two buses to determine the trade type from. The trade type will
        be 'local', 'domestic', or 'foreign', for same location, same
        country code, or different country code, respectively.

    Returns
    -------
    :
        A pandas Series with the same index as component index and 1
        or 0 as values for match or differ, respectively.

    Raises
    ------
    ValueError
        In case the passed trade type is not supported and to prevent
        unintended string matches.
    """
    scopes = (scopes,) if isinstance(scopes, str) else scopes
    if unknown_scopes := set(scopes).difference(
        {TradeTypes.LOCAL, TradeTypes.DOMESTIC, TradeTypes.FOREIGN}
    ):
        raise ValueError(f"Invalid trade scopes detected: {unknown_scopes}.")
    df = comp[[*buses]]
    trade = df.apply(lambda row: get_trade_type(row[buses[0]], row[buses[1]]), axis=1)
    return trade.isin(scopes)