Skip to content

fileio.py

Input - Output related functions.

Exporter

A class to export statistics.

The exporter data frame consists of multiple joined statistics, aggregated to countries and scaled to a specified unit. The data frame format is verified and expected by export functions.

Parameters:

Name Type Description Default
statistics list

A list of Series for time aggregated statistics or list of data frames for statistics with snapshots as columns.

required
view_config dict

The merged view configuration dictionary from :func:read_views_config.

required
Source code in evals/fileio.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
class Exporter:
    """
    A class to export statistics.

    The exporter data frame consists of multiple joined statistics,
    aggregated to countries and scaled to a specified unit. The
    data frame format is verified and expected by export functions.

    Parameters
    ----------
    statistics
        A list of Series for time aggregated statistics or list of
        data frames for statistics with snapshots as columns.
    view_config
        The merged view configuration dictionary from
        :func:`read_views_config`.
    """

    def __init__(
        self,
        statistics: list,
        view_config: dict,
    ):
        self.statistics = statistics
        units = {stat.attrs["unit"] for stat in statistics}
        if len(units) != 1:
            raise ValueError(f"Mixed units cannot be exported: {units}.")
        self.is_unit = units.pop()
        self.metric_name = view_config["name"]
        self.to_unit = view_config["unit"]
        self.view_config = view_config

        # keep_regions and region_nice_names come from global TOML config
        global_cfg = view_config["_global"]
        self.keep_regions = tuple(global_cfg["keep_regions"])
        self.region_nice_names = global_cfg["region_nice_names"]

        # build the plot config namespace from TOML global defaults
        self.defaults = build_plot_config(global_cfg)

        # apply per-view overrides from the view config
        title = view_config["name"] + TITLE_SUFFIX
        self.defaults.title = title
        self.defaults.file_name_template = view_config["file_name"]
        self.defaults.cutoff = view_config["cutoff"]
        self.defaults.category_orders = view_config["legend_order"]
        self.defaults.database_plot_type = view_config["database_plot_type"]
        self.defaults.database_bus_carrier = view_config["database_bus_carrier"]
        self.defaults.database_specifier = view_config["database_specifier"]

    @cached_property
    def df(self) -> pd.DataFrame:
        """
        Build the metric and store it as a cached property.

        (This is useful, because users do not need to remember
        building the metric data frame. It will be built once if needed)

        Returns
        -------
        :
            The cached metric data frame.
        """
        return combine_statistics(
            self.statistics,
            self.metric_name,
            self.is_unit,
            self.to_unit,
            self.keep_regions,
            self.region_nice_names,
        )

    @staticmethod
    def write_run_json(output_path: Path, run_config: dict) -> None:
        """
        Serialize the run attributes to a JSON file.

        The run.json file holds all attributes required to identify a
        run in the Run table of the database data model. All views and
        variables will be associated with this run database object.

        Parameters
        ----------
        output_path
            The path to the evaluation folder in a scenario run.
        run_config
            The merged run configuration dictionary with all scenario data.

        Returns
        -------
        :
        """
        scenario_name = output_path.parent.name
        resolution_space = run_config.get("mods", {}).get("modify_nuts3_shapes", "")
        resolution_time = run_config["clustering"]["temporal"]["resolution_sector"]

        with Path("pixi.toml").open("rb") as fh:
            project_settings = tomllib.load(fh)

        run_data = {
            "model": "PyPSA-AT",
            "scenario": f"{scenario_name} - {resolution_space} {resolution_time}",
            "version": project_settings["workspace"]["version"],
            "description": run_config.get("description", ""),
            "author": getpass.getuser(),
            "custom_metadata": RUN_META_DATA | run_config,
        }
        run_file_path = output_path / "JSON" / "run.json"
        with run_file_path.open("w", encoding="utf-8") as fh:
            json.dump(run_data, fh, indent=4)

    def export_views(self, output_path: Path) -> None:
        """
        Create the plotly figure and export it as HTML and JSON.

        Parameters
        ----------
        output_path
            The path to the folder where HTML, JSON and
            CSV subdirectories are created.
        """
        cfg = self.defaults
        df = rename_aggregate(
            self.df, level=cfg.plot_category, mapper=self.view_config["categories"]
        )

        df_plot = df.pivot_table(
            index=cfg.pivot_index, columns=cfg.pivot_columns, aggfunc="sum"
        )

        # needed for upload API data bundle ingestion
        self.write_run_json(output_path, self.view_config["meta"])

        for idx, data in df_plot.groupby(cfg.plotby):
            chart = cfg.chart(data, cfg)
            chart.plot()
            exporter = FileExporter(cfg, chart.metric_name)
            exporter.to_html(chart.fig, output_path, cfg.plotby, idx)
            year = None
            if DataModel.YEAR in cfg.plotby:
                if DataModel.YEAR in data.index.names:
                    year = data.index.unique(DataModel.YEAR).item()
                else:
                    year = getattr(chart, "year", None)
            exporter.to_json(
                chart.fig,
                chart.location,
                year,
                output_path,
                cfg.plotby,
                idx,
            )

    def export_csv(self, output_path: Path) -> None:
        """
        Encode the metric data frame to a CSV file.

        Parameters
        ----------
        output_path
            The path to the CSV folder where all the csv files are
            stored.

        Returns
        -------
        :
            Writes the metric to a CSV file.
        """
        file_name = self.defaults.file_name_template.split("_{", maxsplit=1)[0]
        file_path = output_path / "CSV" / f"{file_name}_{NOW}.csv"
        self.df.to_csv(file_path, encoding="utf-8")

    def export(self, result_path: Path, subdir: str) -> None:
        """
        Export the metric to formats specified in the config.

        Parameters
        ----------
        result_path
            The path to the results folder.
        subdir
            The subdirectory inside the results folder to store evaluation results under.

        Returns
        -------
        :
        """
        # apply configuration switches that depend on the requested chart
        chart_class = getattr(plots, self.view_config["chart"])
        self.defaults.chart = chart_class

        if chart_class == plots.ESMGroupedBarChart:
            self.defaults.xaxis_title = ""
        elif chart_class == plots.ESMTimeSeriesChart:
            self.defaults.xaxis_title = ""
            self.defaults.plotby = [DataModel.YEAR, DataModel.LOCATION]
            self.defaults.pivot_index = [
                DataModel.YEAR,
                DataModel.LOCATION,
                DataModel.CARRIER,
            ]
        elif (
            chart_class == plots.ESMBarChart
            and self.defaults.plot_category == DataModel.CARRIER
        ):
            # combine bus carrier to export netted technologies, although
            # they have difference bus_carrier in index, e.g.
            # electricity distribution grid, (AC, low voltage)
            first_bus_carrier = self.statistics[0].index.unique("bus_carrier")[0]
            self.statistics = [
                rename_aggregate(stat, first_bus_carrier, level=DataModel.BUS_CARRIER)
                for stat in self.statistics
            ]

        output_path = self.make_evaluation_result_directories(result_path, subdir)

        self.export_views(output_path)

        export_formats = self.view_config.get("exports", [])
        if "csv" in export_formats:
            self.export_csv(output_path)

        # always run tests after the export
        self.consistency_checks()

    def consistency_checks(self) -> None:
        """
        Run plausibility and consistency checks on a metric.

        The method typically is called after exporting the metric.
        Unmapped categories do not cause evaluations to fail, but
        the evaluation function should return in error state to obviate
        missing entries in the mapping.

        Parameter
        ---------
        config_checks
            A dictionary with flags for every test to run.

        Returns
        -------
        :

        Raises
        ------
        AssertionError
            In case one of the checks fails.
        """
        self.default_checks()

        if "balances_almost_zero" in self.view_config.get("checks", []):
            groups = [DataModel.YEAR, DataModel.LOCATION]
            yearly_sum = self.df.groupby(groups).sum().abs()
            balanced = yearly_sum < self.view_config["cutoff"]
            if isinstance(balanced, pd.DataFrame):
                if not balanced.all().all():
                    raise ValueError(
                        f"Imbalances detected: {yearly_sum[balanced == False].dropna(how='all').sort_values(by=balanced.columns[0], na_position='first').tail()}"
                    )
            else:  # Series
                if not balanced.all().item():
                    raise ValueError(
                        f"Imbalances detected: {yearly_sum[balanced.squeeze() == False].squeeze().sort_values().tail()}"
                    )

    def default_checks(self) -> None:
        """Perform integrity checks for views."""
        if self.view_config.get("chart") == "SankeyChart":
            return  # bypass all checks, because Sankey has its own set of assertions

        category = self.defaults.plot_category
        categories = self.view_config["categories"]

        if not self.df.index.unique(category).isin(categories.keys()).all():
            missing_cats = self.df.index.unique(category).difference(categories.keys())
            raise ValueError(
                f"Incomplete categories detected. There are technologies in the metric "
                f"data frame that are not assigned to a group (nice name)."
                f"\nMissing items: {missing_cats}"
            )

        superfluous_categories = self.df.index.unique(category).difference(
            categories.keys()
        )
        if len(superfluous_categories) > 0:
            logger.warning(f"Superfluous categories defined: {superfluous_categories}")

        a = set(self.view_config["legend_order"])
        b = set(categories.values())
        additional = a.difference(b)
        if additional:
            raise ValueError(
                f"Superfluous categories defined in legend order: {additional}"
            )
        missing = b.difference(a)
        if missing:
            raise ValueError(
                f"Some categories are not defined in legend order: {missing}"
            )

        no_color = [c for c in categories.values() if c not in COLOUR_SCHEME]
        if no_color:
            raise ValueError(
                f"Some categories used in the view do not have a color assigned: {no_color}"
            )

    def make_evaluation_result_directories(
        self, result_path: Path, subdir: Path | str
    ) -> Path:
        """
        Create all directories needed to store evaluations results.

        Parameters
        ----------
        result_path
            The path of the result folder.
        subdir
            A relative path inside the result folder.

        Returns
        -------
        :
            The joined path: result_dir / subdir.
        """
        output_path = self.make_directory(result_path, subdir)
        self.make_directory(output_path, "HTML")
        self.make_directory(output_path, "JSON")
        self.make_directory(output_path, "CSV")

        return output_path

    @staticmethod
    def make_directory(base: Path, subdir: Path | str) -> Path:
        """
        Create a directory and return its path.

        Parameters
        ----------
        base
            The path to base of the new folder.
        subdir
            A relative path inside the base folder.

        Returns
        -------
        :
            The joined path: result_dir / subdir / now.
        """
        base = Path(base).resolve()
        if not base.is_dir():
            raise NotADirectoryError(f"Base path does not exist: {base}.")
        directory_path = base / subdir
        directory_path.mkdir(parents=True, exist_ok=True)

        return directory_path

df cached property

Build the metric and store it as a cached property.

(This is useful, because users do not need to remember building the metric data frame. It will be built once if needed)

Returns:

Type Description
pandas.DataFrame

The cached metric data frame.

consistency_checks()

Run plausibility and consistency checks on a metric.

The method typically is called after exporting the metric. Unmapped categories do not cause evaluations to fail, but the evaluation function should return in error state to obviate missing entries in the mapping.

Parameter

config_checks A dictionary with flags for every test to run.

Returns:

Type Description
None

Raises:

Type Description
AssertionError

In case one of the checks fails.

Source code in evals/fileio.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
def consistency_checks(self) -> None:
    """
    Run plausibility and consistency checks on a metric.

    The method typically is called after exporting the metric.
    Unmapped categories do not cause evaluations to fail, but
    the evaluation function should return in error state to obviate
    missing entries in the mapping.

    Parameter
    ---------
    config_checks
        A dictionary with flags for every test to run.

    Returns
    -------
    :

    Raises
    ------
    AssertionError
        In case one of the checks fails.
    """
    self.default_checks()

    if "balances_almost_zero" in self.view_config.get("checks", []):
        groups = [DataModel.YEAR, DataModel.LOCATION]
        yearly_sum = self.df.groupby(groups).sum().abs()
        balanced = yearly_sum < self.view_config["cutoff"]
        if isinstance(balanced, pd.DataFrame):
            if not balanced.all().all():
                raise ValueError(
                    f"Imbalances detected: {yearly_sum[balanced == False].dropna(how='all').sort_values(by=balanced.columns[0], na_position='first').tail()}"
                )
        else:  # Series
            if not balanced.all().item():
                raise ValueError(
                    f"Imbalances detected: {yearly_sum[balanced.squeeze() == False].squeeze().sort_values().tail()}"
                )

default_checks()

Perform integrity checks for views.

Source code in evals/fileio.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
def default_checks(self) -> None:
    """Perform integrity checks for views."""
    if self.view_config.get("chart") == "SankeyChart":
        return  # bypass all checks, because Sankey has its own set of assertions

    category = self.defaults.plot_category
    categories = self.view_config["categories"]

    if not self.df.index.unique(category).isin(categories.keys()).all():
        missing_cats = self.df.index.unique(category).difference(categories.keys())
        raise ValueError(
            f"Incomplete categories detected. There are technologies in the metric "
            f"data frame that are not assigned to a group (nice name)."
            f"\nMissing items: {missing_cats}"
        )

    superfluous_categories = self.df.index.unique(category).difference(
        categories.keys()
    )
    if len(superfluous_categories) > 0:
        logger.warning(f"Superfluous categories defined: {superfluous_categories}")

    a = set(self.view_config["legend_order"])
    b = set(categories.values())
    additional = a.difference(b)
    if additional:
        raise ValueError(
            f"Superfluous categories defined in legend order: {additional}"
        )
    missing = b.difference(a)
    if missing:
        raise ValueError(
            f"Some categories are not defined in legend order: {missing}"
        )

    no_color = [c for c in categories.values() if c not in COLOUR_SCHEME]
    if no_color:
        raise ValueError(
            f"Some categories used in the view do not have a color assigned: {no_color}"
        )

export(result_path, subdir)

Export the metric to formats specified in the config.

Parameters:

Name Type Description Default
result_path pathlib.Path

The path to the results folder.

required
subdir str

The subdirectory inside the results folder to store evaluation results under.

required

Returns:

Type Description
None
Source code in evals/fileio.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
def export(self, result_path: Path, subdir: str) -> None:
    """
    Export the metric to formats specified in the config.

    Parameters
    ----------
    result_path
        The path to the results folder.
    subdir
        The subdirectory inside the results folder to store evaluation results under.

    Returns
    -------
    :
    """
    # apply configuration switches that depend on the requested chart
    chart_class = getattr(plots, self.view_config["chart"])
    self.defaults.chart = chart_class

    if chart_class == plots.ESMGroupedBarChart:
        self.defaults.xaxis_title = ""
    elif chart_class == plots.ESMTimeSeriesChart:
        self.defaults.xaxis_title = ""
        self.defaults.plotby = [DataModel.YEAR, DataModel.LOCATION]
        self.defaults.pivot_index = [
            DataModel.YEAR,
            DataModel.LOCATION,
            DataModel.CARRIER,
        ]
    elif (
        chart_class == plots.ESMBarChart
        and self.defaults.plot_category == DataModel.CARRIER
    ):
        # combine bus carrier to export netted technologies, although
        # they have difference bus_carrier in index, e.g.
        # electricity distribution grid, (AC, low voltage)
        first_bus_carrier = self.statistics[0].index.unique("bus_carrier")[0]
        self.statistics = [
            rename_aggregate(stat, first_bus_carrier, level=DataModel.BUS_CARRIER)
            for stat in self.statistics
        ]

    output_path = self.make_evaluation_result_directories(result_path, subdir)

    self.export_views(output_path)

    export_formats = self.view_config.get("exports", [])
    if "csv" in export_formats:
        self.export_csv(output_path)

    # always run tests after the export
    self.consistency_checks()

export_csv(output_path)

Encode the metric data frame to a CSV file.

Parameters:

Name Type Description Default
output_path pathlib.Path

The path to the CSV folder where all the csv files are stored.

required

Returns:

Type Description
None

Writes the metric to a CSV file.

Source code in evals/fileio.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def export_csv(self, output_path: Path) -> None:
    """
    Encode the metric data frame to a CSV file.

    Parameters
    ----------
    output_path
        The path to the CSV folder where all the csv files are
        stored.

    Returns
    -------
    :
        Writes the metric to a CSV file.
    """
    file_name = self.defaults.file_name_template.split("_{", maxsplit=1)[0]
    file_path = output_path / "CSV" / f"{file_name}_{NOW}.csv"
    self.df.to_csv(file_path, encoding="utf-8")

export_views(output_path)

Create the plotly figure and export it as HTML and JSON.

Parameters:

Name Type Description Default
output_path pathlib.Path

The path to the folder where HTML, JSON and CSV subdirectories are created.

required
Source code in evals/fileio.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
def export_views(self, output_path: Path) -> None:
    """
    Create the plotly figure and export it as HTML and JSON.

    Parameters
    ----------
    output_path
        The path to the folder where HTML, JSON and
        CSV subdirectories are created.
    """
    cfg = self.defaults
    df = rename_aggregate(
        self.df, level=cfg.plot_category, mapper=self.view_config["categories"]
    )

    df_plot = df.pivot_table(
        index=cfg.pivot_index, columns=cfg.pivot_columns, aggfunc="sum"
    )

    # needed for upload API data bundle ingestion
    self.write_run_json(output_path, self.view_config["meta"])

    for idx, data in df_plot.groupby(cfg.plotby):
        chart = cfg.chart(data, cfg)
        chart.plot()
        exporter = FileExporter(cfg, chart.metric_name)
        exporter.to_html(chart.fig, output_path, cfg.plotby, idx)
        year = None
        if DataModel.YEAR in cfg.plotby:
            if DataModel.YEAR in data.index.names:
                year = data.index.unique(DataModel.YEAR).item()
            else:
                year = getattr(chart, "year", None)
        exporter.to_json(
            chart.fig,
            chart.location,
            year,
            output_path,
            cfg.plotby,
            idx,
        )

make_directory(base, subdir) staticmethod

Create a directory and return its path.

Parameters:

Name Type Description Default
base pathlib.Path

The path to base of the new folder.

required
subdir pathlib.Path | str

A relative path inside the base folder.

required

Returns:

Type Description
pathlib.Path

The joined path: result_dir / subdir / now.

Source code in evals/fileio.py
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
@staticmethod
def make_directory(base: Path, subdir: Path | str) -> Path:
    """
    Create a directory and return its path.

    Parameters
    ----------
    base
        The path to base of the new folder.
    subdir
        A relative path inside the base folder.

    Returns
    -------
    :
        The joined path: result_dir / subdir / now.
    """
    base = Path(base).resolve()
    if not base.is_dir():
        raise NotADirectoryError(f"Base path does not exist: {base}.")
    directory_path = base / subdir
    directory_path.mkdir(parents=True, exist_ok=True)

    return directory_path

make_evaluation_result_directories(result_path, subdir)

Create all directories needed to store evaluations results.

Parameters:

Name Type Description Default
result_path pathlib.Path

The path of the result folder.

required
subdir pathlib.Path | str

A relative path inside the result folder.

required

Returns:

Type Description
pathlib.Path

The joined path: result_dir / subdir.

Source code in evals/fileio.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
def make_evaluation_result_directories(
    self, result_path: Path, subdir: Path | str
) -> Path:
    """
    Create all directories needed to store evaluations results.

    Parameters
    ----------
    result_path
        The path of the result folder.
    subdir
        A relative path inside the result folder.

    Returns
    -------
    :
        The joined path: result_dir / subdir.
    """
    output_path = self.make_directory(result_path, subdir)
    self.make_directory(output_path, "HTML")
    self.make_directory(output_path, "JSON")
    self.make_directory(output_path, "CSV")

    return output_path

write_run_json(output_path, run_config) staticmethod

Serialize the run attributes to a JSON file.

The run.json file holds all attributes required to identify a run in the Run table of the database data model. All views and variables will be associated with this run database object.

Parameters:

Name Type Description Default
output_path pathlib.Path

The path to the evaluation folder in a scenario run.

required
run_config dict

The merged run configuration dictionary with all scenario data.

required

Returns:

Type Description
None
Source code in evals/fileio.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
@staticmethod
def write_run_json(output_path: Path, run_config: dict) -> None:
    """
    Serialize the run attributes to a JSON file.

    The run.json file holds all attributes required to identify a
    run in the Run table of the database data model. All views and
    variables will be associated with this run database object.

    Parameters
    ----------
    output_path
        The path to the evaluation folder in a scenario run.
    run_config
        The merged run configuration dictionary with all scenario data.

    Returns
    -------
    :
    """
    scenario_name = output_path.parent.name
    resolution_space = run_config.get("mods", {}).get("modify_nuts3_shapes", "")
    resolution_time = run_config["clustering"]["temporal"]["resolution_sector"]

    with Path("pixi.toml").open("rb") as fh:
        project_settings = tomllib.load(fh)

    run_data = {
        "model": "PyPSA-AT",
        "scenario": f"{scenario_name} - {resolution_space} {resolution_time}",
        "version": project_settings["workspace"]["version"],
        "description": run_config.get("description", ""),
        "author": getpass.getuser(),
        "custom_metadata": RUN_META_DATA | run_config,
    }
    run_file_path = output_path / "JSON" / "run.json"
    with run_file_path.open("w", encoding="utf-8") as fh:
        json.dump(run_data, fh, indent=4)

get_location(n, c, port='', avoid_eu_locations=True)

Return the grouper series for the location of a component.

By default, the function avoids EU-locations by looking into port 0 and port 1 and prefering locations, that are not 'EU'.

Note, that the bus_carrier will still be the bus_carrier from the "port" argument, i.e. only the location is swapped.

Parameters:

Name Type Description Default
n pypsa.Network

The network to evaluate.

required
c str

The component name, e.g. 'Load', 'Generator', 'Link', etc.

required
port str

Limit results to this branch port.

''
avoid_eu_locations bool

Look into the port 0 and port 1 location in branch components and prefer locations that are not 'EU'. By default, pypsa.statistics assigns the respective bus port location.

True

Returns:

Type Description
pandas.Series

A list of series to group statistics by.

Source code in evals/fileio.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def get_location(
    n: pypsa.Network,
    c: str,
    port: str = "",
    avoid_eu_locations: bool = True,
) -> pd.Series:
    """
    Return the grouper series for the location of a component.

    By default, the function avoids EU-locations by looking into port 0 and port 1 and prefering locations, that are not 'EU'.

    Note, that the bus_carrier will still be the bus_carrier
    from the "port" argument, i.e. only the location is swapped.

    Parameters
    ----------
    n
        The network to evaluate.
    c
        The component name, e.g. 'Load', 'Generator', 'Link', etc.
    port
        Limit results to this branch port.
    avoid_eu_locations
        Look into the port 0 and port 1 location in branch components
        and prefer locations that are not 'EU'. By default,
        pypsa.statistics assigns the respective bus port location.

    Returns
    -------
    :
        A list of series to group statistics by.
    """
    comp = n.components[c].static
    bus_locations = n.components.buses.static.location

    if avoid_eu_locations and c in n.branch_components:
        # avoid EU buses for branch components, e.g. oil CHP
        bus0 = groupers._map_with_multiindex(comp["bus0"], bus_locations).rename("loc0")
        bus1 = groupers._map_with_multiindex(comp["bus1"], bus_locations).rename("loc1")
        buses = pd.concat([bus0, bus1], axis=1)

        def location_selection_logic(row) -> str:
            if row.loc0 != "EU" or pd.isna(row.loc1):
                return row.loc0
            return row.loc1

        return buses.apply(location_selection_logic, axis=1).rename("location")

    # default logic to return location groupers
    return groupers._map_with_multiindex(comp[f"bus{port}"], bus_locations).rename(
        "location"
    )

get_location_from_name_at_port(n, c, location_port='')

Return the location from the component name.

Parameters:

Name Type Description Default
n pypsa.Network

The network to evaluate.

required
c str

The component name, e.g. 'Load', 'Generator', 'Link', etc.

required
location_port str

Limit results to this branch port.

''

Returns:

Type Description
pandas.Series
Source code in evals/fileio.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def get_location_from_name_at_port(
    n: pypsa.Network, c: str, location_port: str = ""
) -> pd.Series:
    """
    Return the location from the component name.

    Parameters
    ----------
    n
        The network to evaluate.
    c
        The component name, e.g. 'Load', 'Generator', 'Link', etc.
    location_port
        Limit results to this branch port.

    Returns
    -------
    :

    """
    group = f"({Regex.region.pattern})"
    return (
        n.static(c)[f"bus{location_port}"]
        .str.extract(group, expand=False)
        .str.strip()  # some white spaces still go through regex
        .rename(f"bus{location_port}")
    )

read_networks(result_path, sub_directory='networks')

Read network results from NetCDF (.nc) files.

The function returns a dictionary of data frames. The planning horizon (year) is used as dictionary key and added to the network as an attribute to associate the year with it. Network snapshots are equal for all networks, although the year changes. This is required to align timestamp columns in a data frame. Snapshots will become fixed late in the evaluation process (just before export to file).

In addition, the function patches the statistics accessor attached to loaded networks and adds the configuration under n.meta if it is missing.

Parameters:

Name Type Description Default
result_path str | pathlib.Path | list[str | pathlib.Path]

Absolute or relative path to the run results folder that contains all model results (typically ends with "results", or is a time-stamp), or list of .nc file paths to load.

required
sub_directory str

The subdirectory name to read files from relative to the result folder.

'networks'

Returns:

Type Description
pypsa.NetworkCollection

A NetworkCollection keyed by planning horizon year (str), with each network's statistics accessor patched to ESMStatistics.

Raises:

Type Description
FileNotFoundError

If no network files are found in the specified location.

Examples:

Load networks from a results directory:

>>> networks = read_networks("results/scenario_2030")
>>> networks.index.tolist()
['2030', '2040', '2050']

Load specific network files:

>>> networks = read_networks([
...     "results/elec_s_37_2030.nc",
...     "results/elec_s_37_2040.nc"
... ])
Source code in evals/fileio.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def read_networks(
    result_path: str | Path | list[str | Path], sub_directory: str = "networks"
) -> NetworkCollection:
    """
    Read network results from NetCDF (.nc) files.

    The function returns a dictionary of data frames. The planning
    horizon (year) is used as dictionary key and added to the network
    as an attribute to associate the year with it. Network snapshots
    are equal for all networks, although the year changes. This is
    required to align timestamp columns in a data frame. Snapshots
    will become fixed late in the evaluation process (just before
    export to file).

    In addition, the function patches the statistics accessor attached
    to loaded networks and adds the configuration under n.meta if it is
    missing.

    Parameters
    ----------
    result_path
        Absolute or relative path to the run results folder that
        contains all model results (typically ends with "results",
        or is a time-stamp), or list of .nc file paths to load.
    sub_directory
        The subdirectory name to read files from relative to the
        result folder.

    Returns
    -------
    :
        A NetworkCollection keyed by planning horizon year (str),
        with each network's statistics accessor patched to ESMStatistics.

    Raises
    ------
    FileNotFoundError
        If no network files are found in the specified location.

    Examples
    --------
    Load networks from a results directory:

    >>> networks = read_networks("results/scenario_2030")
    >>> networks.index.tolist()
    ['2030', '2040', '2050']

    Load specific network files:

    >>> networks = read_networks([
    ...     "results/elec_s_37_2030.nc",
    ...     "results/elec_s_37_2040.nc"
    ... ])
    """
    if isinstance(result_path, list):
        # expecting snakemake.input.networks
        file_paths = [Path(p) for p in result_path]
    else:
        input_path = Path(result_path) / sub_directory
        file_paths = input_path.glob(r"*[0-9].nc")

    file_paths = [*file_paths]  # store paths in a list for error messages
    networks = {}
    for file_path in file_paths:
        year = re.search(Regex.year, file_path.stem).group()
        n = pypsa.Network(file_path)
        n.statistics = ESMStatistics(n)  # register custom statistics
        n.year = year  # for convenience
        networks[year] = n

    if not networks:
        raise FileNotFoundError(f"No networks found in {file_paths}.")

    # needed to make NetworkCollections work
    nc = NetworkCollection(networks)
    nc.statistics = ESMStatistics(nc)

    return nc

read_views_config(func, config_override='config.override.toml')

Return the configuration for a view function.

The function reads the default configuration from the TOML file and optionally updates it using the configuration items in the override file. The configuration returned is stripped down to the relevant parts that matter for the called view function.

Parameters:

Name Type Description Default
func collections.abc.Callable

The view function to be called by the CLI module.

required
config_override str | None

A file name as a string as passed to the CLI module, or None to use only default configuration.

'config.override.toml'

Returns:

Type Description
dict

Dictionary containing 'global' and 'view' configuration sections with optional overrides applied from the second configuration file.

Examples:

>>> config = read_views_config(view_balance_electricity)
>>> config.keys()
dict_keys(['global', 'view'])
Source code in evals/fileio.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def read_views_config(
    func: Callable, config_override: str | None = "config.override.toml"
) -> dict:
    """
    Return the configuration for a view function.

    The function reads the default configuration from the
    TOML file and optionally updates it using the configuration
    items in the override file. The configuration returned
    is stripped down to the relevant parts that matter for the
    called view function.

    Parameters
    ----------
    func
        The view function to be called by the CLI module.
    config_override
        A file name as a string as passed to the CLI module, or None
        to use only default configuration.

    Returns
    -------
    :
        Dictionary containing 'global' and 'view' configuration sections
        with optional overrides applied from the second configuration file.

    Examples
    --------
    >>> config = read_views_config(view_balance_electricity)
    >>> config.keys()
    dict_keys(['global', 'view'])
    """
    default_fp = resources.files("evals") / "config.default.toml"
    default = tomllib.load(default_fp.open("rb"))
    default_global = default["global"]
    default_view = default[func.__name__]

    if config_override:
        override_fp = Path(resources.files("evals")) / config_override
        override = tomllib.load(override_fp.open("rb"))
        default_global = deep_update(default_global, override["global"])

        if override_view := override.get(func.__name__, {}):
            default_view = deep_update(default_view, override_view)

    # inject global config into view dict so Exporter can access it without
    # requiring callers to change the view_config=config["view"] call pattern
    default_view["_global"] = default_global
    config = {"global": default_global, "view": default_view}

    logger = logging.getLogger()
    logger.debug(f"Configuration items: {config}")

    return config