Source code for bagofholding.h5.context

from __future__ import annotations

import os
import pathlib
from types import TracebackType
from typing import ClassVar, Literal

import h5py

from bagofholding.exceptions import FileAlreadyOpenError, FileNotOpenError


[docs] class HasH5FileContext: """ A mixin class for context management with an :class:`h5py.File` object. Supports addressing a group inside an HDF5 file by extending the filepath past a recognized file extension (e.g., ``folder/file.h5/group/sub`` refers to the group ``/group/sub`` inside ``folder/file.h5``). This allows storing multiple bags in a single HDF5 file. The set of recognized extensions is controlled by :attr:`file_extensions`. """ libver_str: ClassVar[str] = "latest" file_extensions: ClassVar[tuple[str, ...]] = (".h5", ".hdf5") filepath: pathlib.Path _file: h5py.File | None _context_depth: int _parsed_path: tuple[pathlib.Path, str] | None _working_root: h5py.Group | None @property def file(self) -> h5py.Group: """The bag's working root group. When the bag's filepath has no interior group component, this is the HDF5 file's root group (i.e., the :class:`h5py.File` itself, which is an :class:`h5py.Group`). When the bag is rooted at an interior path, this is the corresponding sub-group. """ if self._file is None: raise FileNotOpenError(f"{self.filepath} is not open; use `open` or `with`") if self._working_root is None: group_path = self.h5_group_path self._working_root = ( self._file if group_path == "/" else self._file[group_path] ) return self._working_root @file.setter def file(self, new_file: h5py.File | None) -> None: self._file = new_file self._working_root = None def _parse_path(self) -> tuple[pathlib.Path, str]: """Split :attr:`filepath` into the filesystem path and an interior group path. Walks up the filepath looking for a component whose suffix matches one of :attr:`file_extensions` or which already exists as a file. Returns the (file path, interior group path) pair. The interior group path is always returned with a leading ``"/"``; ``"/"`` itself indicates no interior path (the bag is rooted at the file root). Cached on first call: :attr:`filepath` is treated as fixed for the lifetime of the bag, and the parse is hot enough during packing that recomputing each access measurably costs stack frames (pathlib's ``relative_to`` is recursive on Python 3.12). """ if self._parsed_path is not None: return self._parsed_path full = self.filepath.absolute() candidate = full while True: if candidate.suffix in self.file_extensions or candidate.is_file(): interior_rel = full.relative_to(candidate) interior = str(interior_rel) if interior in (".", ""): self._parsed_path = (candidate, "/") return self._parsed_path self._parsed_path = (candidate, "/" + interior.replace("\\", "/")) return self._parsed_path if candidate.parent == candidate: self._parsed_path = (self.filepath, "/") return self._parsed_path candidate = candidate.parent @property def h5_file_path(self) -> pathlib.Path: """The filesystem path to the underlying HDF5 file.""" return self._parse_path()[0] @property def h5_group_path(self) -> str: """The interior group path inside the HDF5 file. Returns ``"/"`` when the bag is rooted at the file root. """ return self._parse_path()[1] @property def is_subpath(self) -> bool: """Whether the filepath points inside an HDF5 file rather than at its root.""" return self.h5_group_path != "/"
[docs] def open(self, mode: Literal["r", "r+", "w", "w-", "x", "a"]) -> h5py.Group: if self._file is not None: raise FileAlreadyOpenError(f"The bag at {self.filepath} is already open") file_path, group_path = self._parse_path() self._file = h5py.File(file_path, mode, libver=self.libver_str) if group_path == "/": return self._file if group_path in self._file: return self._file[group_path] if mode == "r": raise KeyError(f"Group {group_path!r} not found in {file_path}") return self._file.create_group(group_path)
def _open_for_write(self, overwrite_existing: bool) -> None: """Open the underlying file and prepare the target group for a fresh write. Combines validating the target, clearing existing data, and opening the file into a single :func:`h5py.File` call so a save touches the file only once. """ file_path, group_path = self._parse_path() if group_path == "/": if os.path.exists(self.filepath): if overwrite_existing and os.path.isfile(self.filepath): os.remove(self.filepath) else: raise FileExistsError( f"{self.filepath} already exists or is not a file." ) self._file = h5py.File(file_path, "w", libver=self.libver_str) return self._file = h5py.File(file_path, "a", libver=self.libver_str) try: self._reject_save_under_existing_bag(group_path) if group_path in self._file: if overwrite_existing: del self._file[group_path] else: raise FileExistsError( f"Group {group_path!r} already exists in {file_path}." ) self._file.create_group(group_path) except BaseException: self.close() raise def _reject_save_under_existing_bag(self, group_path: str) -> None: """Refuse to save into a group that sits below an existing bag. Walks the ancestors of ``group_path`` inside the open file and raises if any of them is itself the root of a stored bag (detected by the presence of a ``qualname`` attribute, which every BagInfo writes). Nesting bags would let the outer bag observe the inner bag's payload through its own metadata, which is not a coherent state. """ assert self._file is not None parts = [p for p in group_path.strip("/").split("/") if p] cur = "" for part in parts[:-1]: cur += "/" + part if cur not in self._file: continue group = self._file[cur] # H5Bag writes its bag info as group attributes; TrieH5Bag instead # writes it inside a `trie_segments` dataset under the group. Either # marker means the group is already the root of a stored bag. if "qualname" in group.attrs or "trie_segments" in group: raise FileExistsError( f"Cannot save a bag at {group_path!r}: an existing bag " f"already lives at the ancestor group {cur!r}." ) def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: self._context_depth -= 1 if self._context_depth == 0: self.close()
[docs] def close(self) -> None: if self._file is not None: self._file.close() self._file = None self._working_root = None
def __del__(self) -> None: self.close()
[docs] @staticmethod def maybe_decode(attr: str | bytes) -> str: return attr if isinstance(attr, str) else attr.decode("utf-8")