"""Module providing an abstract base classes for creating specific trackers."""
import importlib.metadata
import json
import subprocess
import tempfile
import warnings
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager, nullcontext
from dataclasses import asdict, dataclass, fields
from datetime import timedelta
from typing import IO, TypedDict, Union
import cf
from cftime import date2num, datetime
from .trajectory import Trajectory
[docs]
@dataclass
class TCTrackerParameters:
"""
Base Data Class for containing parameters of TCTracker classes.
Parameters for a specific algorithm should be subclassed from this base class.
The child class should be annotated as a dataclass using ``@dataclass(repr=False)``.
Examples
--------
>>> from tctrack.core import TCTrackerParameters
>>>
>>> @dataclass(repr=False)
>>> class MyTrackerParameters(TCTrackerParameters):
>>> param_a: int
>>> param_b: str
>>>
>>> params = MyTrackerParameters(param_a=1, param_b="example")
>>>
>>> print(params)
MyTrackerParameters(
param_a = 1
param_b = example
)
"""
def __repr__(self) -> str:
"""Provide string representation of Parameters to users."""
attributes = "\n\t".join(
f"{field.name} \t = {getattr(self, field.name)}" for field in fields(self)
)
return f"{type(self).__name__}(\n\t{attributes}\n)"
def __str__(self) -> str:
"""Provide string representation of Parameters."""
return self.__repr__()
def is_typed_dict_instance(data, typed_dict_class):
"""Check if an object has all required keys of the TypedDict class."""
return isinstance(data, dict) and all(
key in data for key in typed_dict_class.__required_keys__
)
[docs]
class TCTracker(ABC):
"""
Abstract Base Class representing a generic TCTracker class.
Attributes
----------
_variable_metadata : dict[str, TCTrackerMetadata] | None
A dictionary containing metadata for variables.
This attribute must be initialized by the subclass through the
:meth:`_set_metadata` method, prior to which it is initialised as ``None``.
_time_metadata : dict[str, ] | None
A dictionary containing metadata for times including calendar, units, and start
and end times of the dataset.
This attribute must be initialized by the subclass through the
:meth:`_set_metadata` method, prior to which it is initialised as ``None``.
_global_metadata : dict[str, str]
A dictionary containing global metadata about the data and TCTrack parameters.
This will be populated by the base class, but additional subclass-specific
metadata can be added using the :meth:`_set_metadata` method.
"""
# Private attributes
_variable_metadata: dict[str, TCTrackerMetadata] | None = None
_time_metadata: TCTrackerTimeMetadata | None = None
_global_metadata: dict[str, str]
@property
@abstractmethod
def _parameters(self) -> list[TCTrackerParameters]:
"""A list of the parameter objects that is accessible from the base class."""
return []
@property
def variable_metadata(self) -> dict:
"""
dict: Read-only property containing NetCDF metadata for variables.
Raises
------
AttributeError
If `_variable_metadata` has not been initialized.
"""
if not self._variable_metadata:
err_msg = "_variable_metadata has not been initialized."
raise AttributeError(err_msg)
return self._variable_metadata
@property
def time_metadata(self) -> TCTrackerTimeMetadata:
"""
dict: Read-only property containing time metadata for this Tracker run.
Raises
------
AttributeError
If `_time_metadata` has not been initialized.
"""
if not self._time_metadata:
err_msg = "_time_metadata has not been initialized."
raise AttributeError(err_msg)
if self._time_metadata and not is_typed_dict_instance(
self._time_metadata, TCTrackerTimeMetadata
):
err_msg = (
"_time_metadata does not conform to the expected format of "
"`TCTrackerTimeMetadata`."
)
raise TypeError(err_msg)
return self._time_metadata
@property
def global_metadata(self) -> dict:
"""
dict: Read-only property containing metadata for tctrack / tracker instance.
Raises
------
AttributeError
If `_global_metadata` has not been initialized.
"""
if not hasattr(self, "_global_metadata"):
err_msg = "_global_metadata has not been initialized."
raise AttributeError(err_msg)
return self._global_metadata
@abstractmethod
def _set_metadata(self) -> None:
"""Abstract method to initialize subclass-specific metadata.
This method must be implemented by subclasses to populate the
:attr:`_variable_metadata` attribute with relevant metadata for variables and
`:attr:`_time_metadata` with metadata about the time for the dataset.
This will be called from the :meth:`set_metadata` method.
Notes
-----
The :attr:`_variable_metadata` attribute is expected to be a dictionary where
keys are variable names and values are instances of :class:`TCTrackerMetadata`
containing the metadata for that variable (e.g., `standard_name`, `long_name`,
`units`).
The :attr:`_time_metadata` attribute is expected to be a typed dictionary of
the :class:`TCTrackerTimeMetadata` form containing the calendar type and units
of the dataset, as well as the start and end times.
The :attr:`_global_metadata` attribute is defined in :meth:`set_metadata`.
Additional key-value pairs can be added, but it should not overwritten.
Examples
--------
>>> class MyTracker(TCTracker):
... def _set_metadata(self):
... self._variable_metadata = {
... "example_variable": TCTrackerMetadata(
... properties={
... "standard_name": "example_standard_name",
... "long_name": "Example Long Name",
... "units": "example_units",
... },
... constructs=[<CF CellMethod>],
... )
... }
... self._time_metadata = {
... "calendar": "example_calendar",
... "units": "days since yyyy-mm-dd",
... "start_time": cftime.datetime(
... yyyy, mm, dd, hh, calendar=self.example_calendar
... ),
... "end_time": cftime.datetime(
... yyyy, mm, dd, hh, calendar=self.example_calendar
... ),
... }
"""
[docs]
def run_tracker_subprocess( # noqa: PLR0912, PLR0913
self,
command_name: str,
command_list: list[str],
input_file: str | None = None,
input_str: str | None = None,
cwd: str | None = None,
verbosity: int = 1,
) -> dict:
"""Run a subprocess command for a cyclone tracking algorithm.
Parameters
----------
command_name : str
The name of the command, used in logging and error messages.
command_list : list[str]
The command and its arguments to execute.
input_file : str | None
Path to a file to pass to the process via stdin (e.g. a namelist).
Cannot be used together with input_str. Defaults to None.
input_str : str | None
A string to pass to the process via stdin (e.g. interactive inputs).
Cannot be used together with input_file. Defaults to None.
cwd : str | None
Working directory in which to execute the command. Defaults to None.
verbosity : int
Controls how much output is shown:
0 = No output gets printed.
1 = summary, first and last 12 lines printed (default).
2 = Entire output is streamed in real-time.
Defaults to 1.
Returns
-------
dict
Dictionary of subprocess output to 'stdout', 'stderr', and 'returncode'.
Raises
------
ValueError
If both input_file and input_str are provided simultaneously.
ValueError
If verbosity is not 0, 1, or 2.
"""
stdin_context: Union[IO, AbstractContextManager]
if input_file and input_str:
msg = "Please provide either input_file or input_str, not both."
raise ValueError(msg)
if not command_list:
msg = "command_list cannot be empty"
raise ValueError(msg)
if verbosity not in (0, 1, 2):
msg = "Verbosity must be 0, 1, or 2."
raise ValueError(msg)
if verbosity != 0:
print(f"Executing {command_name}...")
if input_file is not None:
stdin_context = open(input_file, "r") # noqa: SIM115
elif verbosity == 2 and input_str is not None: # noqa: PLR2004
stdin_context = tempfile.TemporaryFile(mode="w+") # noqa: SIM115
else:
stdin_context = nullcontext(None)
stdin_file = None
try:
with stdin_context as stdin_file:
if verbosity == 2 and input_str is not None and stdin_file is not None: # noqa: PLR2004
stdin_file.write(input_str)
stdin_file.seek(0)
if verbosity == 2: # noqa: PLR2004
process = subprocess.Popen( # noqa: S603
command_list,
stdin=stdin_file,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=False,
bufsize=1,
cwd=cwd,
)
stdout_lines = []
for line in iter(process.stdout.readline, ""): # type: ignore[union-attr]
print(line, end="")
stdout_lines.append(line)
stdout = "".join(stdout_lines)
_, stderr = process.communicate()
returncode = process.returncode
if returncode != 0:
msg = (
f"{command_name} failed with a non-zero exit code: "
f"{returncode}:\n{stderr}"
)
raise RuntimeError(msg)
else:
result = subprocess.run( # noqa: S603
command_list,
stdin=stdin_file,
input=input_str,
check=True,
capture_output=True,
text=True,
cwd=cwd,
)
stdout, stderr, returncode = (
result.stdout,
result.stderr,
result.returncode,
)
if verbosity == 1:
print(f"{command_name} completed successfully.")
print(
f"First 12 lines of output:\n"
f"{''.join(stdout.splitlines(True)[:12])}"
f"\n...\n\n"
f"Last 12 lines of output:\n"
f"{''.join(stdout.splitlines(True)[-12:])}"
)
return {"stdout": stdout, "stderr": stderr, "returncode": returncode}
except FileNotFoundError as exc:
msg = (
f"{command_name} failed because the executable could not be found.\n"
"Did you provide the full executable path or add it to $PATH?\n"
)
raise FileNotFoundError(msg) from exc
except subprocess.CalledProcessError as exc:
msg = (
f"{command_name} failed with a non-zero exit code: "
f"({exc.returncode}):\n{exc.stderr}"
)
raise RuntimeError(msg) from exc
[docs]
@abstractmethod
def read_trajectories(self) -> list[Trajectory]:
"""
Parse tracking algorithm outputs into list of :class:`tctrack.core.Trajectory`.
Implementation is deferred to the specific tracking algorithm.
For compatibility elsewhere in TCTrack trajectories are assumed to contain as
a minimum data for ``lat``, ``lon``, and ``timestep``.
Returns
-------
list[Trajectory]
A list of :class:`tctrack.core.Trajectory` objects.
"""
[docs]
def to_netcdf(self, output_file: str) -> None: # noqa: PLR0915, PLR0912
"""
Write track trajectories to CF-compliant NetCDF trajectory file.
Reads in trajectories based on the parameters set for a specific implementation
of the class and writes them to a CF-Conventions compliant NetCDF trajectory
file using cf-python.
Trajectories are assumed to contain as a minimum data for ``lat``, ``lon``,
and ``timestep``.
An ancillary field variable is added to the output file indicating any tracks
that start/end within 1 day of the input dataset boundaries.
Parameters
----------
output_file: str
filename for the output netCDF file
Note: This will be placed in the local directory unless a full path is given
Warnings
--------
UserWarning
If there are no trajectories read in from the tracker outputs by
:meth:`read_trajectories()` meaning no NetCDF output file can be written.
References
----------
`CF-Conventions v1.1 - H.4. Trajectory Data <https://cfconventions.org/Data/cf-conventions/cf-conventions-1.11/cf-conventions.html#trajectory-data>`_
`cf-python documentation <https://ncas-cms.github.io/cf-python/index.html>`_
Examples
--------
Instantiate a :class:`TCTracker` subclass instance with appropriate parameters,
run the relevant methods to generate cyclone track trajectories, and then save
the results to a CF-compliant trajectory file:
>>> my_tracker = TCTracker(...)
>>> ...
>>> my_tracker.to_netcdf("my_netcdf_file.nc")
"""
# Set the metadata if not done already
if (
not self._variable_metadata
or not self._global_metadata
or not self._time_metadata
):
self.set_metadata()
# Read in the trajectories generated by the tracker implementation
# Ensure they contain assumed variables lon, lat, time
trajectories = self.read_trajectories()
if len(trajectories) == 0:
msg = (
"There are no trajectories in this period so no output file will be "
"written."
)
warnings.warn(msg, category=UserWarning, stacklevel=3)
return
# Validate that each trajectory contains the required keys and
# check for trajectories starting and ending on file boundaries
required_keys = {"time", "lat", "lon"}
starting_trajectory = [False] * len(trajectories)
ending_trajectory = [False] * len(trajectories)
for i, trajectory in enumerate(trajectories):
missing_keys = required_keys - trajectory.data.keys()
if missing_keys:
errmsg = (
f"Trajectory {i} is missing required keys: "
f"{', '.join(missing_keys)}"
)
raise ValueError(errmsg)
# Check for trajectories starting and ending within a day of file boundaries
if (
trajectory.data["time"][0] - self.time_metadata["start_time"]
) <= timedelta(days=1):
starting_trajectory[i] = True
if (
self.time_metadata["end_time"] - trajectory.data["time"][-1]
) <= timedelta(days=1):
ending_trajectory[i] = True
start_field = cf.FieldAncillary(
data=starting_trajectory,
properties={
"standard_name": "status_flag",
"long_name": "Trajectory starting at start of dataset flag.",
},
)
start_field.nc_set_variable("start_flag")
end_field = cf.FieldAncillary(
data=ending_trajectory,
properties={
"standard_name": "status_flag",
"long_name": "Trajectory finishing at end of dataset flag.",
},
)
end_field.nc_set_variable("end_flag")
# Determine dimensions
num_trajectories = len(trajectories)
max_obs = max(trajectory.observations for trajectory in trajectories)
# Define domain axes and coords based on number of trajectories and max lengths
domain_axis_traj = cf.DomainAxis(size=num_trajectories)
domain_axis_obs = cf.DomainAxis(size=max_obs)
domain_axis_traj.nc_set_dimension("trajectory")
domain_axis_obs.nc_set_dimension("observation")
dim_traj = cf.DimensionCoordinate(
data=cf.Data(range(num_trajectories)),
properties={
"standard_name": "trajectory",
"cf_role": "trajectory_id",
"long_name": "trajectory index",
},
)
dim_obs = cf.DimensionCoordinate(
data=cf.Data(range(max_obs)),
properties={
"standard_name": "observation",
"long_name": "observation index",
},
)
# Create auxiliary coordinates for time, latitude, longitude
# Convert time from cftime to num format to write out via cf
time_fill = -1e8
time_data = cf.Data(
[
date2num(
trajectory.data["time"],
units=self.time_metadata["units"],
calendar=self.time_metadata["calendar"],
).tolist()
+ [time_fill] * (max_obs - trajectory.observations)
for trajectory in trajectories
],
fill_value=time_fill,
)
time_coord = cf.AuxiliaryCoordinate(
data=time_data,
properties={
"standard_name": "time",
"long_name": "time",
"units": cf.Units(
self.time_metadata["units"], calendar=self.time_metadata["calendar"]
),
"missing_value": time_fill,
},
)
lat_lon_fill = -999.9
lat_data = cf.Data(
[
trajectory.data["lat"] + [None] * (max_obs - trajectory.observations)
for trajectory in trajectories
],
fill_value=lat_lon_fill,
)
lat_coord = cf.AuxiliaryCoordinate(
data=lat_data,
properties={
"standard_name": "latitude",
"long_name": "latitude",
"units": "degrees_north",
"missing_value": lat_lon_fill,
},
)
lon_data = cf.Data(
[
trajectory.data["lon"] + [None] * (max_obs - trajectory.observations)
for trajectory in trajectories
],
fill_value=lat_lon_fill,
)
lon_coord = cf.AuxiliaryCoordinate(
data=lon_data,
properties={
"standard_name": "longitude",
"long_name": "longitude",
"units": "degrees_east",
"missing_value": lat_lon_fill,
},
)
# Create a cf.Field for each non-coordinate variable in Track.data
# Assumes all trajectories contain the same variables
fields = []
for variable in trajectories[0].data:
if variable in {"time", "lat", "lon"}:
continue
# Define the variable metadata
metadata = self.variable_metadata.get(variable, TCTrackerMetadata({}))
metadata.properties["featureType"] = "trajectory"
field = cf.Field(properties=metadata.properties)
# By default cf-python sets variable name as the standard name.
# If there is no standard name in the metadata for this variable we set
# it manually to something meaningful (cf-python default is `data_n`).
if "standard_name" not in metadata.properties:
field.nc_set_variable(variable)
# Add any metadata constructs
if metadata.constructs:
for ic, construct in enumerate(metadata.constructs):
kwargs = {}
if metadata.construct_kwargs:
kwargs = metadata.construct_kwargs[ic]
field.set_construct(construct, **kwargs)
# Add the axes / coordinates
axis_traj = field.set_construct(domain_axis_traj)
axis_obs = field.set_construct(domain_axis_obs)
field.set_construct(dim_traj, axes=(axis_traj,))
field.set_construct(dim_obs, axes=(axis_obs,))
field.set_construct(time_coord, axes=(axis_traj, axis_obs))
field.set_construct(lat_coord, axes=(axis_traj, axis_obs))
field.set_construct(lon_coord, axes=(axis_traj, axis_obs))
field.set_construct(start_field, axes=(axis_traj))
field.set_construct(end_field, axes=(axis_traj))
field_fill = -1e10
variable_data = cf.Data(
[
trajectory.data[variable]
+ [None] * (max_obs - trajectory.observations)
for trajectory in trajectories
],
fill_value=field_fill,
)
# Add the variable coordinate to the field
field.set_data(variable_data, axes=(axis_traj, axis_obs))
field.set_property("missing_value", field_fill)
# Add the global metadata
if self.global_metadata:
field.nc_set_global_attributes(self.global_metadata)
fields.append(field)
# Write to file
cf.write(fields, output_file) # type: ignore[operator]
[docs]
@abstractmethod
def run_tracker(self, output_file: str) -> None:
"""
Run the tracker to obtain tropical cyclone track trajectories as NetCDF file.
Implementation is deferred to the specific tracking algorithm.
This should first run any relevant methods for generating cyclone trajectories
from the input data files.
The trajectories output is then saved as a CF-compliant trajectory netCDF file
by calling the :meth:`to_netcdf()` method of this class.
Arguments
---------
output_file : str
Filename to which the tropical cyclone trajectories are saved.
"""