Source code for tctrack.core.tracker

"""Module providing an abstract base classes for creating specific trackers."""

import importlib.metadata
import json
import subprocess
import tempfile
import warnings
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager, nullcontext
from dataclasses import asdict, dataclass, fields
from datetime import timedelta
from typing import IO, TypedDict, Union

import cf
from cftime import date2num, datetime

from .trajectory import Trajectory


[docs] @dataclass class TCTrackerParameters: """ Base Data Class for containing parameters of TCTracker classes. Parameters for a specific algorithm should be subclassed from this base class. The child class should be annotated as a dataclass using ``@dataclass(repr=False)``. Examples -------- >>> from tctrack.core import TCTrackerParameters >>> >>> @dataclass(repr=False) >>> class MyTrackerParameters(TCTrackerParameters): >>> param_a: int >>> param_b: str >>> >>> params = MyTrackerParameters(param_a=1, param_b="example") >>> >>> print(params) MyTrackerParameters( param_a = 1 param_b = example ) """ def __repr__(self) -> str: """Provide string representation of Parameters to users.""" attributes = "\n\t".join( f"{field.name} \t = {getattr(self, field.name)}" for field in fields(self) ) return f"{type(self).__name__}(\n\t{attributes}\n)" def __str__(self) -> str: """Provide string representation of Parameters.""" return self.__repr__()
[docs] class TCTrackerTimeMetadata(TypedDict): """Dataclass containing the time metadata for a dataset used by a Tracker.""" calendar: str """The calendar type as a string.""" units: str """The calendar units as a string.""" start_time: datetime """The start time of the data processed as a ``cftime.datetime`` object.""" end_time: datetime """The final time of the data processed as a ``cftime.datetime`` object."""
def is_typed_dict_instance(data, typed_dict_class): """Check if an object has all required keys of the TypedDict class.""" return isinstance(data, dict) and all( key in data for key in typed_dict_class.__required_keys__ )
[docs] @dataclass class TCTrackerMetadata: """Dataclass containing the metadata for a single variable in variable_metadata.""" properties: dict[str, str] """The basic metadata properties for the variable.""" constructs: list | None = None """A list of any CF constructs to add, such as :class:`cf.CellMethod` constructs.""" construct_kwargs: list[dict] | None = None """ A list of kwargs (as dicts) to use when the :meth:`cf.Field.set_construct` method is called in :meth:`TCTracker.to_netcdf`. This can be left as ``None``, otherwise it must be the same length as :attr:`constructs`. """ def __post_init__(self): """Ensure both constructs and construct_kwargs are the same length.""" if ( self.constructs and self.construct_kwargs and len(self.constructs) != len(self.construct_kwargs) ): msg = ( "'constructs' and 'construct_kwargs' have mismatched lengths " f"(got {len(self.constructs)} and {len(self.construct_kwargs)})" ) raise ValueError(msg)
[docs] class TCTracker(ABC): """ Abstract Base Class representing a generic TCTracker class. Attributes ---------- _variable_metadata : dict[str, TCTrackerMetadata] | None A dictionary containing metadata for variables. This attribute must be initialized by the subclass through the :meth:`_set_metadata` method, prior to which it is initialised as ``None``. _time_metadata : dict[str, ] | None A dictionary containing metadata for times including calendar, units, and start and end times of the dataset. This attribute must be initialized by the subclass through the :meth:`_set_metadata` method, prior to which it is initialised as ``None``. _global_metadata : dict[str, str] A dictionary containing global metadata about the data and TCTrack parameters. This will be populated by the base class, but additional subclass-specific metadata can be added using the :meth:`_set_metadata` method. """ # Private attributes _variable_metadata: dict[str, TCTrackerMetadata] | None = None _time_metadata: TCTrackerTimeMetadata | None = None _global_metadata: dict[str, str] @property @abstractmethod def _parameters(self) -> list[TCTrackerParameters]: """A list of the parameter objects that is accessible from the base class.""" return [] @property def variable_metadata(self) -> dict: """ dict: Read-only property containing NetCDF metadata for variables. Raises ------ AttributeError If `_variable_metadata` has not been initialized. """ if not self._variable_metadata: err_msg = "_variable_metadata has not been initialized." raise AttributeError(err_msg) return self._variable_metadata @property def time_metadata(self) -> TCTrackerTimeMetadata: """ dict: Read-only property containing time metadata for this Tracker run. Raises ------ AttributeError If `_time_metadata` has not been initialized. """ if not self._time_metadata: err_msg = "_time_metadata has not been initialized." raise AttributeError(err_msg) if self._time_metadata and not is_typed_dict_instance( self._time_metadata, TCTrackerTimeMetadata ): err_msg = ( "_time_metadata does not conform to the expected format of " "`TCTrackerTimeMetadata`." ) raise TypeError(err_msg) return self._time_metadata @property def global_metadata(self) -> dict: """ dict: Read-only property containing metadata for tctrack / tracker instance. Raises ------ AttributeError If `_global_metadata` has not been initialized. """ if not hasattr(self, "_global_metadata"): err_msg = "_global_metadata has not been initialized." raise AttributeError(err_msg) return self._global_metadata @abstractmethod def _set_metadata(self) -> None: """Abstract method to initialize subclass-specific metadata. This method must be implemented by subclasses to populate the :attr:`_variable_metadata` attribute with relevant metadata for variables and `:attr:`_time_metadata` with metadata about the time for the dataset. This will be called from the :meth:`set_metadata` method. Notes ----- The :attr:`_variable_metadata` attribute is expected to be a dictionary where keys are variable names and values are instances of :class:`TCTrackerMetadata` containing the metadata for that variable (e.g., `standard_name`, `long_name`, `units`). The :attr:`_time_metadata` attribute is expected to be a typed dictionary of the :class:`TCTrackerTimeMetadata` form containing the calendar type and units of the dataset, as well as the start and end times. The :attr:`_global_metadata` attribute is defined in :meth:`set_metadata`. Additional key-value pairs can be added, but it should not overwritten. Examples -------- >>> class MyTracker(TCTracker): ... def _set_metadata(self): ... self._variable_metadata = { ... "example_variable": TCTrackerMetadata( ... properties={ ... "standard_name": "example_standard_name", ... "long_name": "Example Long Name", ... "units": "example_units", ... }, ... constructs=[<CF CellMethod>], ... ) ... } ... self._time_metadata = { ... "calendar": "example_calendar", ... "units": "days since yyyy-mm-dd", ... "start_time": cftime.datetime( ... yyyy, mm, dd, hh, calendar=self.example_calendar ... ), ... "end_time": cftime.datetime( ... yyyy, mm, dd, hh, calendar=self.example_calendar ... ), ... } """
[docs] def set_metadata(self) -> None: """Initialise the metadata attributes used in the CF-netcdf output. This is called from the :meth:`to_netcdf` method. This sets the :attr:`_global_metadata` attribute with the TCTrack parameters and name of the tracker. It then calls the subclass-specific :meth:`_set_metadata` to set :attr:`_variable_metadata` and :attr:`_time_metadata`. """ # Create a two-level dictionary containing the parameters parameters = {} for parameter_obj in self._parameters: parameter_class = type(parameter_obj).__name__ parameters[parameter_class] = asdict(parameter_obj) # Store the global metadata, including the parameters in a json format self._global_metadata = { "tctrack_version": importlib.metadata.version("tctrack"), "tctrack_tracker": type(self).__name__, "tctrack_parameters": json.dumps(parameters), } self._set_metadata()
[docs] def run_tracker_subprocess( # noqa: PLR0912, PLR0913 self, command_name: str, command_list: list[str], input_file: str | None = None, input_str: str | None = None, cwd: str | None = None, verbosity: int = 1, ) -> dict: """Run a subprocess command for a cyclone tracking algorithm. Parameters ---------- command_name : str The name of the command, used in logging and error messages. command_list : list[str] The command and its arguments to execute. input_file : str | None Path to a file to pass to the process via stdin (e.g. a namelist). Cannot be used together with input_str. Defaults to None. input_str : str | None A string to pass to the process via stdin (e.g. interactive inputs). Cannot be used together with input_file. Defaults to None. cwd : str | None Working directory in which to execute the command. Defaults to None. verbosity : int Controls how much output is shown: 0 = No output gets printed. 1 = summary, first and last 12 lines printed (default). 2 = Entire output is streamed in real-time. Defaults to 1. Returns ------- dict Dictionary of subprocess output to 'stdout', 'stderr', and 'returncode'. Raises ------ ValueError If both input_file and input_str are provided simultaneously. ValueError If verbosity is not 0, 1, or 2. """ stdin_context: Union[IO, AbstractContextManager] if input_file and input_str: msg = "Please provide either input_file or input_str, not both." raise ValueError(msg) if not command_list: msg = "command_list cannot be empty" raise ValueError(msg) if verbosity not in (0, 1, 2): msg = "Verbosity must be 0, 1, or 2." raise ValueError(msg) if verbosity != 0: print(f"Executing {command_name}...") if input_file is not None: stdin_context = open(input_file, "r") # noqa: SIM115 elif verbosity == 2 and input_str is not None: # noqa: PLR2004 stdin_context = tempfile.TemporaryFile(mode="w+") # noqa: SIM115 else: stdin_context = nullcontext(None) stdin_file = None try: with stdin_context as stdin_file: if verbosity == 2 and input_str is not None and stdin_file is not None: # noqa: PLR2004 stdin_file.write(input_str) stdin_file.seek(0) if verbosity == 2: # noqa: PLR2004 process = subprocess.Popen( # noqa: S603 command_list, stdin=stdin_file, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=False, bufsize=1, cwd=cwd, ) stdout_lines = [] for line in iter(process.stdout.readline, ""): # type: ignore[union-attr] print(line, end="") stdout_lines.append(line) stdout = "".join(stdout_lines) _, stderr = process.communicate() returncode = process.returncode if returncode != 0: msg = ( f"{command_name} failed with a non-zero exit code: " f"{returncode}:\n{stderr}" ) raise RuntimeError(msg) else: result = subprocess.run( # noqa: S603 command_list, stdin=stdin_file, input=input_str, check=True, capture_output=True, text=True, cwd=cwd, ) stdout, stderr, returncode = ( result.stdout, result.stderr, result.returncode, ) if verbosity == 1: print(f"{command_name} completed successfully.") print( f"First 12 lines of output:\n" f"{''.join(stdout.splitlines(True)[:12])}" f"\n...\n\n" f"Last 12 lines of output:\n" f"{''.join(stdout.splitlines(True)[-12:])}" ) return {"stdout": stdout, "stderr": stderr, "returncode": returncode} except FileNotFoundError as exc: msg = ( f"{command_name} failed because the executable could not be found.\n" "Did you provide the full executable path or add it to $PATH?\n" ) raise FileNotFoundError(msg) from exc except subprocess.CalledProcessError as exc: msg = ( f"{command_name} failed with a non-zero exit code: " f"({exc.returncode}):\n{exc.stderr}" ) raise RuntimeError(msg) from exc
[docs] @abstractmethod def read_trajectories(self) -> list[Trajectory]: """ Parse tracking algorithm outputs into list of :class:`tctrack.core.Trajectory`. Implementation is deferred to the specific tracking algorithm. For compatibility elsewhere in TCTrack trajectories are assumed to contain as a minimum data for ``lat``, ``lon``, and ``timestep``. Returns ------- list[Trajectory] A list of :class:`tctrack.core.Trajectory` objects. """
[docs] def to_netcdf(self, output_file: str) -> None: # noqa: PLR0915, PLR0912 """ Write track trajectories to CF-compliant NetCDF trajectory file. Reads in trajectories based on the parameters set for a specific implementation of the class and writes them to a CF-Conventions compliant NetCDF trajectory file using cf-python. Trajectories are assumed to contain as a minimum data for ``lat``, ``lon``, and ``timestep``. An ancillary field variable is added to the output file indicating any tracks that start/end within 1 day of the input dataset boundaries. Parameters ---------- output_file: str filename for the output netCDF file Note: This will be placed in the local directory unless a full path is given Warnings -------- UserWarning If there are no trajectories read in from the tracker outputs by :meth:`read_trajectories()` meaning no NetCDF output file can be written. References ---------- `CF-Conventions v1.1 - H.4. Trajectory Data <https://cfconventions.org/Data/cf-conventions/cf-conventions-1.11/cf-conventions.html#trajectory-data>`_ `cf-python documentation <https://ncas-cms.github.io/cf-python/index.html>`_ Examples -------- Instantiate a :class:`TCTracker` subclass instance with appropriate parameters, run the relevant methods to generate cyclone track trajectories, and then save the results to a CF-compliant trajectory file: >>> my_tracker = TCTracker(...) >>> ... >>> my_tracker.to_netcdf("my_netcdf_file.nc") """ # Set the metadata if not done already if ( not self._variable_metadata or not self._global_metadata or not self._time_metadata ): self.set_metadata() # Read in the trajectories generated by the tracker implementation # Ensure they contain assumed variables lon, lat, time trajectories = self.read_trajectories() if len(trajectories) == 0: msg = ( "There are no trajectories in this period so no output file will be " "written." ) warnings.warn(msg, category=UserWarning, stacklevel=3) return # Validate that each trajectory contains the required keys and # check for trajectories starting and ending on file boundaries required_keys = {"time", "lat", "lon"} starting_trajectory = [False] * len(trajectories) ending_trajectory = [False] * len(trajectories) for i, trajectory in enumerate(trajectories): missing_keys = required_keys - trajectory.data.keys() if missing_keys: errmsg = ( f"Trajectory {i} is missing required keys: " f"{', '.join(missing_keys)}" ) raise ValueError(errmsg) # Check for trajectories starting and ending within a day of file boundaries if ( trajectory.data["time"][0] - self.time_metadata["start_time"] ) <= timedelta(days=1): starting_trajectory[i] = True if ( self.time_metadata["end_time"] - trajectory.data["time"][-1] ) <= timedelta(days=1): ending_trajectory[i] = True start_field = cf.FieldAncillary( data=starting_trajectory, properties={ "standard_name": "status_flag", "long_name": "Trajectory starting at start of dataset flag.", }, ) start_field.nc_set_variable("start_flag") end_field = cf.FieldAncillary( data=ending_trajectory, properties={ "standard_name": "status_flag", "long_name": "Trajectory finishing at end of dataset flag.", }, ) end_field.nc_set_variable("end_flag") # Determine dimensions num_trajectories = len(trajectories) max_obs = max(trajectory.observations for trajectory in trajectories) # Define domain axes and coords based on number of trajectories and max lengths domain_axis_traj = cf.DomainAxis(size=num_trajectories) domain_axis_obs = cf.DomainAxis(size=max_obs) domain_axis_traj.nc_set_dimension("trajectory") domain_axis_obs.nc_set_dimension("observation") dim_traj = cf.DimensionCoordinate( data=cf.Data(range(num_trajectories)), properties={ "standard_name": "trajectory", "cf_role": "trajectory_id", "long_name": "trajectory index", }, ) dim_obs = cf.DimensionCoordinate( data=cf.Data(range(max_obs)), properties={ "standard_name": "observation", "long_name": "observation index", }, ) # Create auxiliary coordinates for time, latitude, longitude # Convert time from cftime to num format to write out via cf time_fill = -1e8 time_data = cf.Data( [ date2num( trajectory.data["time"], units=self.time_metadata["units"], calendar=self.time_metadata["calendar"], ).tolist() + [time_fill] * (max_obs - trajectory.observations) for trajectory in trajectories ], fill_value=time_fill, ) time_coord = cf.AuxiliaryCoordinate( data=time_data, properties={ "standard_name": "time", "long_name": "time", "units": cf.Units( self.time_metadata["units"], calendar=self.time_metadata["calendar"] ), "missing_value": time_fill, }, ) lat_lon_fill = -999.9 lat_data = cf.Data( [ trajectory.data["lat"] + [None] * (max_obs - trajectory.observations) for trajectory in trajectories ], fill_value=lat_lon_fill, ) lat_coord = cf.AuxiliaryCoordinate( data=lat_data, properties={ "standard_name": "latitude", "long_name": "latitude", "units": "degrees_north", "missing_value": lat_lon_fill, }, ) lon_data = cf.Data( [ trajectory.data["lon"] + [None] * (max_obs - trajectory.observations) for trajectory in trajectories ], fill_value=lat_lon_fill, ) lon_coord = cf.AuxiliaryCoordinate( data=lon_data, properties={ "standard_name": "longitude", "long_name": "longitude", "units": "degrees_east", "missing_value": lat_lon_fill, }, ) # Create a cf.Field for each non-coordinate variable in Track.data # Assumes all trajectories contain the same variables fields = [] for variable in trajectories[0].data: if variable in {"time", "lat", "lon"}: continue # Define the variable metadata metadata = self.variable_metadata.get(variable, TCTrackerMetadata({})) metadata.properties["featureType"] = "trajectory" field = cf.Field(properties=metadata.properties) # By default cf-python sets variable name as the standard name. # If there is no standard name in the metadata for this variable we set # it manually to something meaningful (cf-python default is `data_n`). if "standard_name" not in metadata.properties: field.nc_set_variable(variable) # Add any metadata constructs if metadata.constructs: for ic, construct in enumerate(metadata.constructs): kwargs = {} if metadata.construct_kwargs: kwargs = metadata.construct_kwargs[ic] field.set_construct(construct, **kwargs) # Add the axes / coordinates axis_traj = field.set_construct(domain_axis_traj) axis_obs = field.set_construct(domain_axis_obs) field.set_construct(dim_traj, axes=(axis_traj,)) field.set_construct(dim_obs, axes=(axis_obs,)) field.set_construct(time_coord, axes=(axis_traj, axis_obs)) field.set_construct(lat_coord, axes=(axis_traj, axis_obs)) field.set_construct(lon_coord, axes=(axis_traj, axis_obs)) field.set_construct(start_field, axes=(axis_traj)) field.set_construct(end_field, axes=(axis_traj)) field_fill = -1e10 variable_data = cf.Data( [ trajectory.data[variable] + [None] * (max_obs - trajectory.observations) for trajectory in trajectories ], fill_value=field_fill, ) # Add the variable coordinate to the field field.set_data(variable_data, axes=(axis_traj, axis_obs)) field.set_property("missing_value", field_fill) # Add the global metadata if self.global_metadata: field.nc_set_global_attributes(self.global_metadata) fields.append(field) # Write to file cf.write(fields, output_file) # type: ignore[operator]
[docs] @abstractmethod def run_tracker(self, output_file: str) -> None: """ Run the tracker to obtain tropical cyclone track trajectories as NetCDF file. Implementation is deferred to the specific tracking algorithm. This should first run any relevant methods for generating cyclone trajectories from the input data files. The trajectories output is then saved as a CF-compliant trajectory netCDF file by calling the :meth:`to_netcdf()` method of this class. Arguments --------- output_file : str Filename to which the tropical cyclone trajectories are saved. """