diff --git a/README.md b/README.md new file mode 100644 index 0000000..6c3748e --- /dev/null +++ b/README.md @@ -0,0 +1,183 @@ +[![CI](https://github.com/HuttleyLab/scitrack/actions/workflows/testing_develop.yml/badge.svg)](https://github.com/HuttleyLab/scitrack/actions/workflows/testing_develop.yml) +[![coverall](https://coveralls.io/repos/github/GavinHuttley/scitrack/badge.svg?branch=develop)](https://coveralls.io/github/GavinHuttley/scitrack?branch=develop) +[![Using Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) +[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/release/python-310/) + +# About `scitrack` + +One of the critical challenges in scientific analysis is to track all the elements involved. This includes the arguments provided to a specific application (including default values), input data files referenced by those arguments and output data generated by the application. In addition to this, tracking a minimal set of system specific information. + +`scitrack` is a simple package aimed at researchers writing scripts, or more substantial scientific software, to support the tracking of scientific computation. The package provides elementary functionality to support logging. The primary capabilities concern generating checksums on input and output files and facilitating logging of the computational environment. + +## Installing + +``` +$ pip install scitrack +``` + +## `CachingLogger` + +There is a single object provided by `scitrack`, `CachingLogger`. This object is basically a wrapper around the Python standard library `logging` module. On invocation, `CachingLogger` captures basic information regarding the system and the command line call that was made to invoke the application. + +In addition, the class provides convenience methods for logging both the path and the md5 hexdigest checksum [^1] of input/output files. A method is also provided for producing checksums of text data. The latter is useful for the case when data are from a stream or a database, for instance. + +All logging calls are cached until a path for a logfile is provided. The logger can also, optionally, create directories. + +## Simple instantiation of the logger + +Creating the logger. Setting `create_dir=True` means on creation of the logfile, the directory path will be created also. + +```python +from scitrack import CachingLogger +LOGGER = CachingLogger(create_dir=True) +LOGGER.log_file_path = "somedir/some_path.log" +``` + +The last assignment triggers creation of `somedir/some_path.log`. + +> **Warning** +> +> Once set, a loggers `.log_file_path` cannot be changed. + +## Capturing a programs arguments and options + +`scitrack` will write the contents of `sys.argv` to the log file, prefixed by `command_string`. However, this only captures arguments specified on the command line. Tracking the value of optional arguments not specified, which may have default values, is critical to tracking the full command set. Doing this is now easy with the simple statement `LOGGER.log_args()`. The logger can also record the versions of named dependencies. + +Here's one approach to incorporating `scitrack` into a command line application built using the `click` [command line interface library](http://click.pocoo.org/). Below we create a simple `click` app and capture the required and optional argument values. + +> **Note** +> +> `LOGGER.log_args()` should be called immediately after the function definition, or after "true" default values have been configured. + +```python +import click + +from scitrack import CachingLogger + +LOGGER = CachingLogger() + + +@click.command() +@click.option("-i", "--infile", type=click.Path(exists=True)) +@click.option("-t", "--test", is_flag=True, help="Run test.") +def main(infile, test): + # capture the local variables, at this point just provided arguments + LOGGER.log_args() + LOGGER.log_versions("numpy") + LOGGER.input_file(infile) + LOGGER.log_file_path = "some_path.log" + + +if __name__ == "__main__": + main() +``` + +The `CachingLogger.write()` method takes a message and a label. All other logging methods wrap `log_message()`, providing a specific label. For instance, the method `input_file()` writes out two lines in the log. + +- `input_file_path`, the absolute path to the input file +- `input_file_path md5sum`, the hex digest of the file + +`output_file()` behaves analogously. An additional method `text_data()` is useful for other data input/output sources (e.g. records from a database). For this to have value for arbitrary data types requires a systematic approach to ensuring the text conversion is robust across platforms. + +The `log_args()` method captures all local variables within a scope. + +The `log_versions()` method captures the version of the caller's own package, the versions of its currently installed declared dependencies (across `core` and every extras group), and the versions of any additional named packages, e.g. `LOGGER.log_versions(['numpy', 'sklearn'])`. A name supplied via `packages` that is neither installed nor importable raises `PackageNotFoundError`. + +The `log_licenses()` method mirrors `log_versions()` but emits the declared license of each package under the `license` label. The license is resolved from package metadata, preferring the PEP 639 `License-Expression` field over the legacy `License` field; if neither is declared the value is recorded as `UNKNOWN`. A name supplied via `packages` that is not installed raises `PackageNotFoundError`. + +### Some sample output + +``` +2020-05-25 13:32:07 Eratosthenes:98447 INFO system_details : system=Darwin Kernel Version 19.4.0: Wed Mar 4 22:28:40 PST 2020; root:xnu-6153.101.6~15/RELEASE_X86_64 +2020-05-25 13:32:07 Eratosthenes:98447 INFO python : 3.8.2 +2020-05-25 13:32:07 Eratosthenes:98447 INFO user : gavin +2020-05-25 13:32:07 Eratosthenes:98447 INFO command_string : ./demo.py -i /Users/gavin/repos/SciTrack/tests/sample-lf.fasta +2020-05-25 13:32:07 Eratosthenes:98447 INFO params : {'infile': '/Users/gavin/repos/SciTrack/tests/sample-lf.fasta', 'test': False} +2020-05-25 13:32:07 Eratosthenes:98447 INFO version : __main__==None +2020-05-25 13:32:07 Eratosthenes:98447 INFO version : numpy==1.18.4 +2020-05-25 13:32:07 Eratosthenes:98447 INFO input_file_path : /Users/gavin/repos/SciTrack/tests/sample-lf.fasta +2020-05-25 13:32:07 Eratosthenes:98447 INFO input_file_path md5sum : 96eb2c2632bae19eb65ea9224aaafdad +``` + +## Summarising a log file + +`log_summary()` parses a written log file and returns its entries grouped by label, in the order they appear: + +```python +from scitrack import log_summary + +summary = log_summary("some_path.log") +print(summary["input_file_path"]) +# ['/path/to/input1.fasta', '/path/to/input2.fasta'] +print(summary["input_file_path md5sum"]) +# ['96eb2c2632bae19eb65ea9224aaafdad', ...] +``` + +By default only labels emitted by `scitrack` itself are captured: `system_details`, `python`, `user`, `command_string`, `params`, `version`, `license`, `input_file_path`, `output_file_path`, the corresponding ` md5sum` lines, and `misc`. Lines under any other label are skipped silently. Two keyword arguments relax this: + +- `labels=[...]` — opt in to additional, application-specific labels that your code emits via `LOGGER.log_message(msg, label="...")`. +- `all_labels=True` — capture every label encountered in the file. + +### Project-specific summaries + +Because `log_summary()` returns a plain `dict[str, list[str]]`, clients can layer their own reporting on top without re-parsing the file. For example, a pipeline that emits custom `dataset_id` and `accuracy` entries can produce a project-tailored report: + +```python +from scitrack import log_summary + +summary = log_summary( + "path/to/run.log", + labels=["dataset_id", "accuracy"], +) + +print(f"Run by {summary['user'][0]} on Python {summary['python'][0]}") +print(f"Command: {summary['command_string'][0]}") +print(f"Inputs: {len(summary.get('input_file_path', []))}") +print(f"Outputs: {len(summary.get('output_file_path', []))}") +for dataset, accuracy in zip(summary["dataset_id"], summary["accuracy"]): + print(f" {dataset}: accuracy={accuracy}") +``` + +This makes it straightforward to summarise application logs, making it useful for many things including provenance reports for users. + +## Other useful functions + +Two other useful functions are `get_file_hexdigest()` and `get_text_hexdigest()` compute md5sum for files or text. Those can be used to validate the state recorded in the log-file matches results at a later date, e.g. `output_file()` records the path and md5sum of an output file. + +`get_package_licenses(packages)` returns a `{name: license}` mapping for a list of installed packages, raising `PackageNotFoundError` eagerly if any name is not installed. + +## Reporting issues + +Use the project [issue tracker](https://github.com/HuttleyLab/scitrack/issues). + +## For Developers + +The project is managed with [uv](https://docs.astral.sh/uv/) and uses the `hatchling` build backend. Having cloned the repository onto your machine, install `uv` (see the [uv installation guide](https://docs.astral.sh/uv/getting-started/installation/)), then create a development environment with the `dev` dependency group: + +``` +$ cd path/to/cloned/repo +$ uv sync --group dev +``` + +Run the test suite on the host Python: + +``` +$ uv run pytest tests/ +``` + +The test, coverage, type-check, and format sessions are driven by [nox](https://nox.thea.codes/) with the uv backend. For example: + +``` +$ uv run nox -db uv -s test-3.11 +$ uv run nox -db uv -s testcov-3.14 -- --cov-report=term-missing +$ uv run nox -db uv -s type_check-3.11 +$ uv run nox -db uv -s fmt +``` + +Build the sdist and wheel with: + +``` +$ uv build +``` + +[^1]: The hexdigest serves as a unique signature of a files contents. diff --git a/README.rst b/README.rst deleted file mode 100644 index 96085a6..0000000 --- a/README.rst +++ /dev/null @@ -1,165 +0,0 @@ -|CI| |coverall| |Using Ruff| |Python 3.10+| - -.. |CI| image:: https://github.com/HuttleyLab/scitrack/actions/workflows/testing_develop.yml/badge.svg - :target: https://github.com/HuttleyLab/scitrack/actions/workflows/testing_develop.yml - -.. |coverall| image:: https://coveralls.io/repos/github/GavinHuttley/scitrack/badge.svg?branch=develop - :target: https://coveralls.io/github/GavinHuttley/scitrack?branch=develop - -.. |Using Ruff| image:: https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json - :target: https://github.com/astral-sh/ruff - -.. |Python 3.10+| image:: https://img.shields.io/badge/python-3.10+-blue.svg - :target: https://www.python.org/downloads/release/python-310/ - - -################## -About ``scitrack`` -################## - -One of the critical challenges in scientific analysis is to track all the elements involved. This includes the arguments provided to a specific application (including default values), input data files referenced by those arguments and output data generated by the application. In addition to this, tracking a minimal set of system specific information. - -``scitrack`` is a simple package aimed at researchers writing scripts, or more substantial scientific software, to support the tracking of scientific computation. The package provides elementary functionality to support logging. The primary capabilities concern generating checksums on input and output files and facilitating logging of the computational environment. - -To see some projects using ``scitrack``, see the "Used by" link at the top of the `project GitHub page `_. - -********** -Installing -********** - -For the released version:: - - $ pip install scitrack - -For the very latest version:: - - $ pip install git+https://github.com/HuttleyLab/scitrack - -Or clone it:: - - $ git clone git@github.com:HuttleyLab/scitrack.git - -And then install:: - - $ pip install ~/path/to/scitrack - -***************** -``CachingLogger`` -***************** - -There is a single object provided by ``scitrack``, ``CachingLogger``. This object is basically a wrapper around the Python standard library ``logging`` module. On invocation, ``CachingLogger`` captures basic information regarding the system and the command line call that was made to invoke the application. - -In addition, the class provides convenience methods for logging both the path and the md5 hexdigest checksum [1]_ of input/output files. A method is also provided for producing checksums of text data. The latter is useful for the case when data are from a stream or a database, for instance. - -All logging calls are cached until a path for a logfile is provided. The logger can also, optionally, create directories. - -********************************** -Simple instantiation of the logger -********************************** - -Creating the logger. Setting ``create_dir=True`` means on creation of the logfile, the directory path will be created also. - -.. code:: python - - from scitrack import CachingLogger - LOGGER = CachingLogger(create_dir=True) - LOGGER.log_file_path = "somedir/some_path.log" - -The last assignment triggers creation of ``somedir/some_path.log``. - -.. warning:: - - Once set, a loggers ``.log_file_path`` cannot be changed. - -****************************************** -Capturing a programs arguments and options -****************************************** - -``scitrack`` will write the contents of ``sys.argv`` to the log file, prefixed by ``command_string``. However, this only captures arguments specified on the command line. Tracking the value of optional arguments not specified, which may have default values, is critical to tracking the full command set. Doing this is now easy with the simple statement ``LOGGER.log_args()``. The logger can also record the versions of named dependencies. - -Here's one approach to incorporating ``scitrack`` into a command line application built using the ``click`` `command line interface library `_. Below we create a simple ``click`` app and capture the required and optional argument values. - -.. note:: - - ``LOGGER.log_args()`` should be called immediately after the function definition, or after "true" default values have been configured. - -.. code:: python - - import click - - from scitrack import CachingLogger - - LOGGER = CachingLogger() - - - @click.command() - @click.option("-i", "--infile", type=click.Path(exists=True)) - @click.option("-t", "--test", is_flag=True, help="Run test.") - def main(infile, test): - # capture the local variables, at this point just provided arguments - LOGGER.log_args() - LOGGER.log_versions("numpy") - LOGGER.input_file(infile) - LOGGER.log_file_path = "some_path.log" - - - if __name__ == "__main__": - main() - - -The ``CachingLogger.write()`` method takes a message and a label. All other logging methods wrap ``log_message()``, providing a specific label. For instance, the method ``input_file()`` writes out two lines in the log. - -- ``input_file_path``, the absolute path to the intput file -- ``input_file_path md5sum``, the hex digest of the file - -``output_file()`` behaves analogously. An additional method ``text_data()`` is useful for other data input/output sources (e.g. records from a database). For this to have value for arbitrary data types requires a systematic approach to ensuring the text conversion is robust across platforms. - -The ``log_args()`` method captures all local variables within a scope. - -The ``log_versions()`` method captures versions for the current file and that of a list of named packages, e.g. ``LOGGER.log_versions(['numpy', 'sklearn'])``. - - -Some sample output -================== - -:: - - 2020-05-25 13:32:07 Eratosthenes:98447 INFO system_details : system=Darwin Kernel Version 19.4.0: Wed Mar 4 22:28:40 PST 2020; root:xnu-6153.101.6~15/RELEASE_X86_64 - 2020-05-25 13:32:07 Eratosthenes:98447 INFO python : 3.8.2 - 2020-05-25 13:32:07 Eratosthenes:98447 INFO user : gavin - 2020-05-25 13:32:07 Eratosthenes:98447 INFO command_string : ./demo.py -i /Users/gavin/repos/SciTrack/tests/sample-lf.fasta - 2020-05-25 13:32:07 Eratosthenes:98447 INFO params : {'infile': '/Users/gavin/repos/SciTrack/tests/sample-lf.fasta', 'test': False} - 2020-05-25 13:32:07 Eratosthenes:98447 INFO version : __main__==None - 2020-05-25 13:32:07 Eratosthenes:98447 INFO version : numpy==1.18.4 - 2020-05-25 13:32:07 Eratosthenes:98447 INFO input_file_path : /Users/gavin/repos/SciTrack/tests/sample-lf.fasta - 2020-05-25 13:32:07 Eratosthenes:98447 INFO input_file_path md5sum : 96eb2c2632bae19eb65ea9224aaafdad - -********************** -Other useful functions -********************** - -Two other useful functions are ``get_file_hexdigest`` and ``get_text_hexdigest``. - -**************** -Reporting issues -**************** - -Use the project `issue tracker `_. - -************** -For Developers -************** - -We use flit_ for package building. Having cloned the repository onto your machine. Install ``flit``:: - -$ python3 -m pip install flit - -Do a developer install of ``scitrack`` using flit as:: - -$ cd path/to/cloned/repo -$ flit install -s --python `which python` - -.. note:: This installs a symlink into ``site-packages`` of the python identified by ``which python``. - -.. [1] The hexdigest serves as a unique signature of a files contents. -.. _flit: https://flit.readthedocs.io/en/latest/ diff --git a/pyproject.toml b/pyproject.toml index 4c557e7..7e21467 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" name = "scitrack" dynamic = ["version"] description = "Basic logging capabilities to track scientific computations." -readme = "README.rst" +readme = "README.md" license = "BSD-3-Clause" license-files = ["LICENSE"] authors = [{ name = "Gavin Huttley", email = "Gavin.Huttley@anu.edu.au" }] @@ -42,7 +42,7 @@ path = "src/scitrack/__init__.py" packages = ["src/scitrack"] [tool.hatch.build.targets.sdist] -include = ["src/scitrack", "pyproject.toml", "README.rst", "LICENSE"] +include = ["src/scitrack", "pyproject.toml", "README.md", "LICENSE"] exclude = ["**/*.xml", "**/__pycache__"] [tool.mypy] @@ -53,4 +53,12 @@ warn_unused_ignores = true warn_redundant_casts = true warn_unreachable = true +[tool.coverage.run] +source_pkgs = ["scitrack"] + +[tool.coverage.report] +# restrict the report to scitrack; lazily imported stdlib modules +# (zipfile, importlib.metadata) otherwise leak into the output +include = ["*/scitrack/*"] + # Ruff configuration lives in ./ruff.toml diff --git a/ruff.toml b/ruff.toml index c00d2b5..47ee772 100644 --- a/ruff.toml +++ b/ruff.toml @@ -36,7 +36,7 @@ target-version = "py310" # McCabe complexity (`C901`) by default. # G004 is about lazy formatting of logging args, not important here select = ["ALL"] -ignore = ["EXE002", "FA100", "E501", "D", "G004"] +ignore = ["EXE002", "FA100", "E501", "D", "G004", "COM812"] # Allow fix for all enabled rules (when `--fix`) is provided. fixable = ["ALL"] diff --git a/src/scitrack/__init__.py b/src/scitrack/__init__.py index 0b183db..9f39041 100644 --- a/src/scitrack/__init__.py +++ b/src/scitrack/__init__.py @@ -5,13 +5,17 @@ import contextlib import hashlib import importlib +import importlib.metadata import inspect import logging import os import platform +import re import socket import sys import types +from collections.abc import Callable +from enum import Enum from getpass import getuser from pathlib import Path @@ -21,22 +25,112 @@ VERSION_ATTRS = ["__version__", "version", "VERSION"] -def get_package_name(obj: object) -> str: - """returns the package name for the provided object""" - mod = inspect.getmodule(obj) - name = getattr(mod, "__name__", "") - return name.split(".")[0] +class LogLabel(str, Enum): + """labels emitted by scitrack itself. + + Notes + ----- + User code may also pass arbitrary strings + """ + + MISC = "misc" + PARAMS = "params" + VERSION = "version" + LICENSE = "license" + INPUT_FILE = "input_file_path" + OUTPUT_FILE = "output_file_path" + MD5SUM = "md5sum" + SYSTEM_DETAILS = "system_details" + PYTHON = "python" + USER = "user" + COMMAND_STRING = "command_string" + + def __str__(self) -> str: + return str(self.value) + + +def _installed_package_from_globals(g: dict[str, object]) -> str: + """top-level installed distribution name from a frame's globals, or '' + + Notes + ----- + Prefers ``__package__`` over ``__name__``, collapses dotted names + to their top-level component, skips ``__main__``, and verifies the + result is a registered installed distribution. + """ + candidate = g.get("__package__") or g.get("__name__") or "" + if not isinstance(candidate, str): + return "" + top = candidate.split(".")[0] + if not top or top == "__main__": + return "" + try: + importlib.metadata.distribution(top) + except importlib.metadata.PackageNotFoundError: + return "" + return top + + +def get_package_name(obj: object | None = None) -> str: + """returns the top-level package name + + Parameters + ---------- + obj + Any object whose defining module's top-level package name is + wanted. If ``None``, the caller's frame is used instead. + + Returns + ------- + str + The top-level package name, or ``""`` when it cannot be + resolved. + + Notes + ----- + When no object is provided, the package name is inferred from the + caller's frame, and is only returned if that package is installed. + For any non-installed package, ``""`` is returned. + """ + if obj is not None: + mod = inspect.getmodule(obj) + name = getattr(mod, "__name__", "") + return name.split(".")[0] + + frame = inspect.currentframe() + parent = frame.f_back if frame is not None else None + if parent is None: + return "" + return _installed_package_from_globals(parent.f_globals) + + +def _version_via_metadata(name: str) -> str | None: + """resolve version from installed distribution metadata (PEP 566). + + Returns None if the distribution is not installed, the lookup raises + any exception, or the recorded version is empty. + """ + try: + version = importlib.metadata.version(name) + except Exception: # noqa: BLE001 + return None + return version or None def get_version_for_package(package: str | types.ModuleType) -> str | None: """returns the version of package""" if isinstance(package, str): + version = _version_via_metadata(package) + if version is not None: + return version try: mod = importlib.import_module(package) except ModuleNotFoundError as e: - msg = f"Unknown package {package}" - raise ValueError(msg) from e + raise importlib.metadata.PackageNotFoundError(package) from e elif inspect.ismodule(package): + version = _version_via_metadata(package.__name__.split(".")[0]) + if version is not None: + return version mod = package else: msg = f"Unknown type, package {package}" # type: ignore[unreachable] @@ -60,6 +154,229 @@ def get_version_for_package(package: str | types.ModuleType) -> str | None: return vn +_REQ_NAME_RE = re.compile(r"^\s*([A-Za-z0-9][A-Za-z0-9._-]*)") +_EXTRA_CLAUSE_RE = re.compile(r"""^\s*extra\s*==\s*['\"]([^'\"]+)['\"]\s*$""") + + +def _split_requirement(req: str) -> tuple[str, str]: + """split a Requires-Dist entry into (name, marker_text).""" + head, _, marker = req.partition(";") + match = _REQ_NAME_RE.match(head) + name = match.group(1) if match else "" + return name, marker.strip() + + +def _extract_extra(marker: str) -> tuple[str | None, str]: + """find an ``extra == 'X'`` clause and return (extra_name, residual_marker). + + Notes + ----- + The marker is split on top-level ``and`` so an extras clause can be + excised without mangling neighbouring clauses; the residual rejoins + the remaining clauses with ``and``. + """ + if not marker: + return None, marker + clauses = re.split(r"\s+and\s+", marker) + extra_name: str | None = None + residual: list[str] = [] + for clause in clauses: + match = _EXTRA_CLAUSE_RE.match(clause) + if match is not None and extra_name is None: + extra_name = match.group(1) + else: + residual.append(clause) + return extra_name, " and ".join(residual) + + +def _marker_env() -> dict[str, str]: + """current values of supported PEP 508 environment marker variables.""" + impl = sys.implementation + impl_version = f"{impl.version.major}.{impl.version.minor}.{impl.version.micro}" + return { + "python_version": f"{sys.version_info.major}.{sys.version_info.minor}", + "python_full_version": platform.python_version(), + "sys_platform": sys.platform, + "platform_system": platform.system(), + "platform_machine": platform.machine(), + "implementation_name": impl.name, + "implementation_version": impl_version, + "os_name": os.name, + } + + +def _version_tuple(value: str) -> tuple[int, ...]: + """parse a dotted version string into a comparable tuple of ints.""" + return tuple(int(p) for p in value.split(".")) + + +_ATOM_RE = re.compile( + r"""^\s* + ([A-Za-z_][A-Za-z0-9_]*) # variable + \s*(==|!=|<=|>=|<|>)\s* # operator + ['\"]([^'\"]+)['\"] # quoted literal + \s*$ + """, + re.VERBOSE, +) +_VERSION_VARS = frozenset( + {"python_version", "python_full_version", "implementation_version"}, +) + + +def _compare(op: str, a: object, b: object) -> bool: + """apply a comparison operator to two equally-typed operands.""" + if op == "==": + return a == b + if op == "!=": + return a != b + if op == "<": + return bool(a < b) # type: ignore[operator] + if op == "<=": + return bool(a <= b) # type: ignore[operator] + if op == ">": + return bool(a > b) # type: ignore[operator] + return bool(a >= b) # type: ignore[operator] + + +def _eval_atom(atom: str, env: dict[str, str]) -> bool: + """evaluate a single ``var 'value'`` clause.""" + match = _ATOM_RE.match(atom) + if match is None: + msg = f"unparseable marker atom: {atom!r}" + raise ValueError(msg) + var, op, literal = match.group(1), match.group(2), match.group(3) + if var not in env: + msg = f"unsupported marker variable: {var!r}" + raise ValueError(msg) + actual = env[var] + if var in _VERSION_VARS: + return _compare(op, _version_tuple(actual), _version_tuple(literal)) + return _compare(op, actual, literal) + + +def _evaluate_marker(marker: str) -> bool: + """evaluate a residual marker; on any parse failure return True (conservative).""" + if not marker: + return True + try: + env = _marker_env() + or_terms = re.split(r"\s+or\s+", marker) + return any( + all(_eval_atom(atom, env) for atom in re.split(r"\s+and\s+", term)) + for term in or_terms + ) + except (ValueError, KeyError, AttributeError): + return True + + +def get_package_dependencies( + package: str, + *, + if_installed: bool = False, +) -> dict[str, list[str]]: + """returns declared dependencies of an installed package, grouped by install option + + Parameters + ---------- + package + Distribution name to inspect. + if_installed + If ``True``, only return dependencies that are installed in the + current environment. + + Returns + ------- + dict[str, list[str]] + Mapping of install-option to list of dependency package names. + Core (unconditional) deps live under ``"core"``; each ``extra == + 'X'`` group lives under key ``"X"``. Returns ``{}`` when the + package is not installed or declares no dependencies. + + Notes + ----- + Names are stripped of version specifiers, extras and markers. + Non-extra environment markers are evaluated against the current + interpreter; deps whose markers are False are omitted. By default + the install-state of each dependency is not checked; pass + ``if_installed=True`` to filter on it. Within a single call each + distinct dependency name is probed at most once, and any group + that ends up empty after filtering is omitted from the result. + """ + try: + raw = importlib.metadata.requires(package) + except importlib.metadata.PackageNotFoundError: + return {} + if not raw: + return {} + + result: dict[str, list[str]] = {} + installed_cache: dict[str, bool] = {} + for entry in raw: + name, marker = _split_requirement(entry) + if not name: + continue + extra, residual = _extract_extra(marker) + if not _evaluate_marker(residual): + continue + if if_installed: + if name not in installed_cache: + try: + importlib.metadata.distribution(name) + except importlib.metadata.PackageNotFoundError: + installed_cache[name] = False + else: + installed_cache[name] = True + if not installed_cache[name]: + continue + key = extra or "core" + result.setdefault(key, []).append(name) + return result + + +def _license_for_package(package: str) -> str: + """resolve the license string for an installed package. + + Notes + ----- + Prefers PEP 639 ``License-Expression`` over the legacy ``License`` + field; returns ``"UNKNOWN"`` if neither is declared, if both are + empty, or if a field carries the literal sentinel ``"UNKNOWN"`` that + older sdists write into ``PKG-INFO``. Propagates + ``PackageNotFoundError`` from ``importlib.metadata.metadata`` when + the distribution is not installed. + """ + meta = importlib.metadata.metadata(package) + for field in ("License-Expression", "License"): + if field in meta: + value = meta[field] + if value and value.strip().upper() != "UNKNOWN": + return value + return "UNKNOWN" + + +def get_package_licenses(packages: list[str]) -> dict[str, str]: + """returns the declared license of each named installed package + + Parameters + ---------- + packages + Distribution names to look up. + + Returns + ------- + dict[str, str] + Mapping of each name to its license string. Missing license + metadata yields ``"UNKNOWN"``. + + Notes + ----- + Raises ``PackageNotFoundError`` eagerly on the first uninstalled + name, so callers never see a partial result. + """ + return {name: _license_for_package(name) for name in packages} + + class CachingLogger: """stores log messages until a log filename is provided""" @@ -130,47 +447,58 @@ def mode(self, mode: str) -> None: """the logfile file opening mode""" self._mode = mode - def _record_file(self, file_class: str, file_path: str) -> None: + def _record_file(self, file_class: str | LogLabel, file_path: str) -> None: """writes the file path and md5 checksum to log file""" path: Path = Path(file_path).expanduser().resolve(strict=False) md5sum = get_file_hexdigest(path) self.log_message(str(path), label=file_class) - self.log_message(md5sum, label=f"{file_class} md5sum") + self.log_message(md5sum, label=f"{file_class} {LogLabel.MD5SUM}") - def input_file(self, file_path: str, label: str = "input_file_path") -> None: + def input_file( + self, + file_path: str, + label: str | LogLabel = LogLabel.INPUT_FILE, + ) -> None: """logs path and md5 checksum Argument: - label is inserted before the message""" self._record_file(label, file_path) - def output_file(self, file_path: str, label: str = "output_file_path") -> None: + def output_file( + self, + file_path: str, + label: str | LogLabel = LogLabel.OUTPUT_FILE, + ) -> None: """logs path and md5 checksum Argument: - - label is inserted before the message""" + - label is inserted before the message + """ self._record_file(label, file_path) - def text_data(self, data: str, label: str | None = None) -> None: + def text_data(self, data: str, label: str | LogLabel | None = None) -> None: """logs md5 checksum for input text data. Argument: - label is inserted before the message - For this to be useful you must ensure the text order is persistent.""" + For this to be useful you must ensure the text order is persistent. + """ if label is None: msg = "text_data requires a non-None label" raise ValueError(msg) md5sum = get_text_hexdigest(data) self.log_message(md5sum, label=label) - def log_message(self, msg: str, label: str | None = None) -> None: + def log_message(self, msg: str, label: str | LogLabel | None = None) -> None: """writes a log message Argument: - - label is inserted before the message""" - label = label or "misc" - data = [label, msg] + - label is inserted before the message + """ + label = label or LogLabel.MISC + data = [str(label), msg] msg = " : ".join(data) if not self._started or self._logger is None: self._messages.append(msg) @@ -179,9 +507,11 @@ def log_message(self, msg: str, label: str | None = None) -> None: def log_args(self, args: dict[str, object] | None = None) -> None: """save arguments to file using label='params' + Argument: - args: if None, uses inspect module to get locals - from the calling frame""" + from the calling frame + """ if args is None: frame = inspect.currentframe() parent = frame.f_back if frame is not None else None @@ -193,52 +523,134 @@ def log_args(self, args: dict[str, object] | None = None) -> None: if not isinstance(args[k], self.__class__) and not isinstance(args[k], type(importlib)) } - self.log_message(str(result), label="params") + self.log_message(str(result), label=LogLabel.PARAMS) def shutdown(self) -> None: """safely shutdown the logger""" self._reset() - def log_versions(self, packages: list[str] | str | None = None) -> None: - """logs version from the global namespace where - method is invoked, plus from any named packages""" - to_check: list[str | types.ModuleType] = [] - if isinstance(packages, str) or inspect.ismodule(packages): - to_check = [packages] - elif isinstance(packages, (list, tuple)): - to_check.extend(packages) - - for i, p in enumerate(to_check): - if inspect.ismodule(p): - to_check[i] = p.__name__ - - frame: types.FrameType | None = inspect.currentframe() - if frame is None: - return - - parent = frame.f_back - del frame - if parent is None: - return - - g = parent.f_globals - name = g.get("__package__", g.get("__name__", "")) - if name: - vn = get_version_for_package(name) + def _log_metadata( + self, + packages: list[str] | str | types.ModuleType | None, + value_for: Callable[[str], str | None], + label: LogLabel, + *, + accept_modules: bool, + caller_name: str, + ) -> None: + """shared body for ``log_versions``/``log_licenses``. + + Notes + ----- + Unions ``caller_name``'s installed dependencies with ``packages``, + resolves each name via ``value_for``, then emits ``name==value`` + lines under ``label`` with the caller first, the rest + alphabetical. Lookups happen eagerly so a failed resolution aborts + before any line is written. + """ + caller_value: str | None = None + if caller_name: + try: + caller_value = value_for(caller_name) + except importlib.metadata.PackageNotFoundError: + caller_name = "" + + if packages is None: + user_list: list[str | types.ModuleType] = [] + elif isinstance(packages, str) or ( + accept_modules and inspect.ismodule(packages) + ): + user_list = [packages] else: - candidates = [g[v] for v in VERSION_ATTRS if g.get(v, None)] - vn = candidates[0] if candidates else None - name = get_package_name(parent) + user_list = list(packages) + + user_names = { + (p.__name__.split(".")[0] if inspect.ismodule(p) else p) for p in user_list + } + + deps = ( + get_package_dependencies(caller_name, if_installed=True) + if caller_name + else {} + ) + dep_names: set[str] = {n for names in deps.values() for n in names} - versions = [(name, vn)] - for package in to_check: - vn = get_version_for_package(package) - versions.append((package, vn)) + entries: list[tuple[str, str | None]] = [] + if caller_name: + entries.append((caller_name, caller_value)) + for pkg in sorted((dep_names | user_names) - {caller_name}): + entries.append((pkg, value_for(pkg))) - for n_v in versions: - self.log_message("{}=={}".format(*n_v), label="version") + for name, value in entries: + self.log_message(f"{name}=={value}", label=label) - del parent + def log_versions(self, packages: list[str] | str | None = None) -> None: + """logs the caller's package, its installed dependencies, and named packages + + Parameters + ---------- + packages + Additional package names (or imported modules) whose versions + should also be logged. + + Notes + ----- + The caller's package is resolved via ``get_package_name``. When + it is an installed distribution, its declared dependencies + (across ``core`` and every extras group) are fetched via + ``get_package_dependencies(..., if_installed=True)`` so only + currently-installed deps participate. The set of those names is + union-ed with ``packages`` and emitted in alphabetical order + after the caller's own version line. A name in ``packages`` that + is not installed raises ``PackageNotFoundError``. + """ + frame = inspect.currentframe() + parent = frame.f_back if frame is not None else None + caller_name = ( + _installed_package_from_globals(parent.f_globals) + if parent is not None + else "" + ) + self._log_metadata( + packages, + get_version_for_package, + LogLabel.VERSION, + accept_modules=True, + caller_name=caller_name, + ) + + def log_licenses(self, packages: list[str] | str | None = None) -> None: + """logs the caller's package, its installed dependencies, and named packages + + Parameters + ---------- + packages + Additional package names whose licenses should also be logged. + + Notes + ----- + Mirrors ``log_versions``: the caller's installed package is + resolved from frame globals, its installed dependencies are + fetched via ``get_package_dependencies(..., if_installed=True)``, + and that set is union-ed with ``packages`` then emitted in + alphabetical order after the caller's own license line. A name + in ``packages`` that is not installed raises + ``PackageNotFoundError``. + """ + frame = inspect.currentframe() + parent = frame.f_back if frame is not None else None + caller_name = ( + _installed_package_from_globals(parent.f_globals) + if parent is not None + else "" + ) + self._log_metadata( + packages, + _license_for_package, + LogLabel.LICENSE, + accept_modules=False, + caller_name=caller_name, + ) def set_logger( @@ -261,10 +673,10 @@ def set_logger( handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(level) - logger.info(f"system_details : system={platform.version()}") - logger.info(f"python : {platform.python_version()}") - logger.info(f"user : {getuser()}") - logger.info(f"command_string : {' '.join(sys.argv)}") + logger.info(f"{LogLabel.SYSTEM_DETAILS} : system={platform.version()}") + logger.info(f"{LogLabel.PYTHON} : {platform.python_version()}") + logger.info(f"{LogLabel.USER} : {getuser()}") + logger.info(f"{LogLabel.COMMAND_STRING} : {' '.join(sys.argv)}") return handler @@ -310,3 +722,84 @@ def get_text_hexdigest(data: str | bytes) -> str: md5 = hashlib.md5(usedforsecurity=False) md5.update(data_bytes) return md5.hexdigest() + + +def log_summary( + path: str | os.PathLike[str], + *, + labels: list[str] | None = None, + all_labels: bool = False, +) -> dict[str, list[str]]: + """returns logfile entries grouped by label + + Parameters + ---------- + path + The log file path. + labels + Extra labels (beyond the built-in ``LogLabel``) to recognise. + Lines whose label is not in the recognised set are skipped + silently. + all_labels + If ``True``, every label encountered in the file is captured, + not just the recognised set. + + Returns + ------- + dict[str, list[str]] + Mapping of label to the list of values emitted under that label, + in the order they appear in the file. + + Raises + ------ + ValueError + If a reserved label (``datetime``, ``hostname`` or ``os``) is + requested, either via ``labels`` or an ``all_labels`` capture of a + line bearing that label. + + Notes + ----- + The returned mapping also includes reserved keys derived from the log + itself: ``datetime`` (timestamp of the first logged line, that is when + logging started) and ``hostname`` (host that produced the log, from + that line's prefix), both absent only when the file has no parsable + lines, plus ``os`` (operating system, from the ``system_details`` + line) when present. These names are reserved and may not be used as + labels. + """ + reserved = {"datetime", "hostname", "os"} + if labels and (clash := reserved.intersection(labels)): + msg = f"reserved labels cannot be requested: {sorted(clash)}" + raise ValueError(msg) + + recognised: set[str] = {member.value for member in LogLabel} + recognised.add(f"{LogLabel.INPUT_FILE} {LogLabel.MD5SUM}") + recognised.add(f"{LogLabel.OUTPUT_FILE} {LogLabel.MD5SUM}") + if labels: + recognised.update(labels) + + result: dict[str, list[str]] = {} + with Path(path).open(encoding="utf-8") as fh: + for raw in fh: + line = raw.rstrip("\n") + if not line: + continue + try: + ts, hostpid, _level, message = line.split("\t", 3) + except ValueError: + continue + result.setdefault("datetime", [ts]) + result.setdefault("hostname", [hostpid.rsplit(":", 1)[0]]) + try: + label, value = message.split(" : ", 1) + except ValueError: + continue + if not all_labels and label not in recognised: + continue + if label in reserved: + msg = f"'{label}' is a reserved label in log_summary()" + raise ValueError(msg) + result.setdefault(label, []).append(value) + if label == LogLabel.SYSTEM_DETAILS.value: + result.setdefault("os", [value.removeprefix("system=")]) + return result diff --git a/tests/test_logging.py b/tests/test_logging.py index bf33ee6..3932b42 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -1,4 +1,3 @@ -import contextlib import logging import sys from collections import Counter @@ -9,11 +8,15 @@ import scitrack as _scitrack from scitrack import ( CachingLogger, + LogLabel, __version__, get_file_hexdigest, + get_package_dependencies, + get_package_licenses, get_package_name, get_text_hexdigest, get_version_for_package, + log_summary, set_logger, ) @@ -106,16 +109,111 @@ def test_package_inference(): assert name == "scitrack" +def test_get_package_name_no_arg_installed_package(monkeypatch): + # no-arg call returns the caller's installed package name + + class _Parent: + f_globals = {"__package__": "scitrack", "__name__": "scitrack"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + assert get_package_name() == "scitrack" + + +def test_get_package_name_no_arg_subpackage(monkeypatch): + # dotted subpackage collapses to its top-level distribution name + + class _Parent: + f_globals = {"__package__": "scitrack.sub", "__name__": "scitrack.sub"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + assert get_package_name() == "scitrack" + + +def test_get_package_name_no_arg_falls_back_to_name(monkeypatch): + # when __package__ is empty, fall back to __name__ + + class _Parent: + f_globals = {"__package__": "", "__name__": "scitrack"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + assert get_package_name() == "scitrack" + + +def test_get_package_name_no_arg_not_installed(monkeypatch): + # caller's package is not an installed distribution + + class _Parent: + f_globals = { + "__package__": "not_a_real_pkg_xyz", + "__name__": "not_a_real_pkg_xyz", + } + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + assert get_package_name() == "" + + +def test_get_package_name_no_arg_main_script(monkeypatch): + # script run directly (__name__ == "__main__") is not a package + + class _Parent: + f_globals = {"__package__": None, "__name__": "__main__"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + assert get_package_name() == "" + + +def test_get_package_name_no_arg_no_current_frame(monkeypatch): + # inspect.currentframe() returning None yields "" + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: None) + assert get_package_name() == "" + + +def test_get_package_name_no_arg_no_parent_frame(monkeypatch): + # frame.f_back is None (top-of-stack caller) yields "" + + class _Frame: + f_back = None + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + assert get_package_name() == "" + + def test_package_versioning(): """correctly identify versions for specified packages""" + from importlib.metadata import PackageNotFoundError + vn = get_version_for_package("numpy") assert type(vn) is str - with contextlib.suppress(ValueError): + with pytest.raises(PackageNotFoundError, match="gobbledygook"): get_version_for_package("gobbledygook") - with contextlib.suppress(ValueError): + with pytest.raises(ValueError, match="Unknown type"): get_version_for_package(1) +def test_get_version_for_package_not_installed(): + # uninstalled package name -> PackageNotFoundError carrying the name + from importlib.metadata import PackageNotFoundError + + with pytest.raises(PackageNotFoundError, match="definitely_not_installed_xyz"): + get_version_for_package("definitely_not_installed_xyz") + + def test_tracks_versions(logfile): """should track versions""" LOGGER = CachingLogger(create_dir=True) @@ -191,6 +289,46 @@ def test_tracks_versions_string(logfile): assert expect in line, line +def test_log_versions_unknown_package(logfile): + # log_versions on an uninstalled name -> PackageNotFoundError + from importlib.metadata import PackageNotFoundError + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + with pytest.raises(PackageNotFoundError, match="definitely_not_installed_xyz"): + LOGGER.log_versions("definitely_not_installed_xyz") + LOGGER.shutdown() + + +def test_log_versions_partial_list_raises_eagerly(logfile): + # mixed list: bad name aborts before any "version :" line is written + from importlib.metadata import PackageNotFoundError + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + with pytest.raises(PackageNotFoundError, match="definitely_not_installed_xyz"): + LOGGER.log_versions(["numpy", "definitely_not_installed_xyz"]) + LOGGER.shutdown() + assert not any("version :" in line for line in logfile.read_text().splitlines()) + + +def test_log_versions_uninstalled_module_does_not_raise(logfile): + # an imported module with no installed dist -> no raise; version recorded + pyfile = TEST_ROOTDIR / "delme_log.py" + pyfile.write_text("__version__ = 'local-only'\n") + sys.path.append(str(TEST_ROOTDIR)) + import delme_log + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.log_versions(delme_log) + LOGGER.shutdown() + pyfile.unlink() + assert any( + "delme_log==local-only" in line for line in logfile.read_text().splitlines() + ) + + def test_get_version_for_package(): """should track version if package is a module""" import numpy as np @@ -231,6 +369,231 @@ def test_tracks_versions_module(logfile): assert expect in line, line +def test_get_package_dependencies_not_installed(monkeypatch): + # unknown package -> empty dict (never raises) + def fake_requires(name): + raise _scitrack.importlib.metadata.PackageNotFoundError(name) + + monkeypatch.setattr(_scitrack.importlib.metadata, "requires", fake_requires) + assert get_package_dependencies("definitely_not_installed_xyz") == {} + + +@pytest.mark.parametrize("requires_value", [lambda: None, list]) +def test_get_package_dependencies_no_requires(monkeypatch, requires_value): + # installed package with no declared deps -> empty dict + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: requires_value(), + ) + assert get_package_dependencies("scitrack") == {} + + +def test_get_package_dependencies_core_only(monkeypatch): + # unconditional deps land under "core", names stripped of specifiers + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: ["numpy>=1.0", "pandas (>=2.0)"], + ) + assert get_package_dependencies("scitrack") == {"core": ["numpy", "pandas"]} + + +@pytest.mark.parametrize( + "raw", + [ + "numpy", + "numpy>=1.0", + "numpy (>=1.0)", + "numpy[security]>=1.0", + "numpy ; python_version >= '3.0'", + ], +) +def test_get_package_dependencies_strips_specifiers(monkeypatch, raw): + # every surface form of a single requirement collapses to the base name + monkeypatch.setattr(_scitrack.importlib.metadata, "requires", lambda _: [raw]) + assert get_package_dependencies("scitrack") == {"core": ["numpy"]} + + +def test_get_package_dependencies_partitions_extras(monkeypatch): + # extras-gated deps go under per-extra keys; core deps stay under "core" + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: [ + "numpy>=1.0", + "pytest; extra == 'test'", + "sphinx; extra == 'docs'", + ], + ) + assert get_package_dependencies("scitrack") == { + "core": ["numpy"], + "test": ["pytest"], + "docs": ["sphinx"], + } + + +def test_get_package_dependencies_env_marker_drops_false(monkeypatch): + # dep gated by a marker that's false in this env is dropped (no empty core) + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: ["numpy; python_version < '2.0'"], + ) + assert get_package_dependencies("scitrack") == {} + + +def test_get_package_dependencies_env_marker_keeps_true(monkeypatch): + # dep gated by a marker that's true in this env is kept under "core" + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: ["numpy; python_version >= '3.0'"], + ) + assert get_package_dependencies("scitrack") == {"core": ["numpy"]} + + +@pytest.mark.parametrize( + ("marker_tail", "expected"), + [ + ("python_version >= '3.0'", {"test": ["pytest"]}), + ("python_version < '2.0'", {}), + ], +) +def test_get_package_dependencies_extras_with_env_marker( + monkeypatch, + marker_tail, + expected, +): + # extras-gated dep is included in its group only when the residual marker passes + req = f"pytest; extra == 'test' and {marker_tail}" + monkeypatch.setattr(_scitrack.importlib.metadata, "requires", lambda _: [req]) + assert get_package_dependencies("scitrack") == expected + + +def test_get_package_dependencies_unparseable_marker_kept(monkeypatch): + # unparseable marker -> conservative fallback keeps the dep under "core" + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: ["numpy; this is not a real marker"], + ) + assert get_package_dependencies("scitrack") == {"core": ["numpy"]} + + +@pytest.mark.parametrize( + ("marker", "expected"), + [ + ( + "python_version >= '3.0' and extra == 'test' and python_version >= '3.0'", + {"test": ["pytest"]}, + ), + ( + "python_version < '2.0' and extra == 'test' and python_version >= '3.0'", + {}, + ), + ( + "python_version >= '3.0' and extra == 'test' and python_version < '2.0'", + {}, + ), + ], +) +def test_get_package_dependencies_three_clause_extra_middle( + monkeypatch, + marker, + expected, +): + # extra clause embedded in a 3-clause and-chain - residual must rejoin with ' and ' + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: [f"pytest; {marker}"], + ) + assert get_package_dependencies("scitrack") == expected + + +def test_get_package_dependencies_if_installed_default_unchanged(monkeypatch): + # default if_installed=False still returns deps even when not installed + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: ["definitely_not_installed_xyz", "also_missing_abc"], + ) + assert get_package_dependencies("scitrack") == { + "core": ["definitely_not_installed_xyz", "also_missing_abc"], + } + + +def test_get_package_dependencies_if_installed_filters_mixed(monkeypatch): + # if_installed=True keeps installed names and drops uninstalled ones + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: ["pytest>=7", "definitely_not_installed_xyz"], + ) + assert get_package_dependencies("scitrack", if_installed=True) == { + "core": ["pytest"], + } + + +def test_get_package_dependencies_if_installed_drops_empty_group(monkeypatch): + # if_installed=True omits a group entirely when none of its deps are installed + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: [ + "pytest>=7", + "definitely_not_installed_xyz; extra == 'missing'", + "another_missing_abc; extra == 'missing'", + ], + ) + assert get_package_dependencies("scitrack", if_installed=True) == { + "core": ["pytest"], + } + + +def test_get_package_dependencies_if_installed_memoizes(monkeypatch): + # the same dep name in multiple groups triggers exactly one installation probe + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: [ + "pytest>=7", + "pytest; extra == 'test'", + "pytest; extra == 'dev'", + ], + ) + call_counts: dict[str, int] = {} + real_distribution = _scitrack.importlib.metadata.distribution + + def counting_distribution(name): + call_counts[name] = call_counts.get(name, 0) + 1 + return real_distribution(name) + + monkeypatch.setattr( + _scitrack.importlib.metadata, + "distribution", + counting_distribution, + ) + result = get_package_dependencies("scitrack", if_installed=True) + assert result == {"core": ["pytest"], "test": ["pytest"], "dev": ["pytest"]} + assert call_counts == {"pytest": 1} + + +def test_get_package_dependencies_if_installed_empty_requires_no_probe(monkeypatch): + # if_installed=True with no requires returns {} and never probes installation state + monkeypatch.setattr(_scitrack.importlib.metadata, "requires", lambda _: []) + probed: list[str] = [] + + def trap(name): + probed.append(name) + raise AssertionError("installation probe must not be called") + + monkeypatch.setattr(_scitrack.importlib.metadata, "distribution", trap) + assert get_package_dependencies("scitrack", if_installed=True) == {} + assert probed == [] + + def test_appending(logfile): """appending to an existing logfile should work""" LOGGER = CachingLogger(create_dir=True) @@ -327,6 +690,39 @@ def test_text_data_requires_label(logfile): LOGGER.shutdown() +def test_loglabel_values_are_strings(): + """LogLabel members format as their plain string value""" + from scitrack import LogLabel + + assert LogLabel.PARAMS == "params" + assert f"{LogLabel.PARAMS}" == "params" + assert str(LogLabel.MISC) == "misc" + + +def test_input_file_accepts_loglabel_enum(logfile): + """passing a LogLabel member as the label yields the same log line as the default""" + from scitrack import LogLabel + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.input_file(TEST_ROOTDIR / "sample-lf.fasta", label=LogLabel.INPUT_FILE) + LOGGER.shutdown() + contents = logfile.read_text() + assert "\tinput_file_path :" in contents + assert "\tinput_file_path md5sum :" in contents + + +def test_input_file_accepts_custom_string_label(logfile): + """custom string labels still work for back-compat""" + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.input_file(TEST_ROOTDIR / "sample-lf.fasta", label="my-tag") + LOGGER.shutdown() + contents = logfile.read_text() + assert "\tmy-tag :" in contents + assert "\tmy-tag md5sum :" in contents + + def test_logfile_path(logfile): """correctly assigned""" LOGGER = CachingLogger(create_dir=True, log_file_path=logfile) @@ -417,7 +813,7 @@ class _Frame: def test_log_versions_uses_caller_package_name(monkeypatch, logfile): - """log_versions resolves the caller's package name and writes its version""" + # log_versions resolves the caller's package name and writes its version LOGGER = CachingLogger(create_dir=True) LOGGER.log_file_path = logfile @@ -433,3 +829,718 @@ class _Frame: contents = logfile.read_text() assert "scitrack==" in contents + + +def test_log_versions_resolves_external_caller_package(monkeypatch, logfile): + # log_versions() captures the caller's frame directly, so its f_back is + # the consumer (numpy here) and that package's version is logged rather + # than scitrack's own + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + + class _Consumer: + f_globals = {"__name__": "numpy"} + + class _Frame: + f_back = _Consumer() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + LOGGER.log_versions() + LOGGER.shutdown() + + expect = f"numpy=={get_version_for_package('numpy')}" + contents = logfile.read_text() + assert expect in contents + + +def test_log_versions_emits_installed_deps_of_caller(monkeypatch, logfile): + # caller's get_package_dependencies(if_installed=True) is flattened into version lines + captured_args: dict[str, object] = {} + + def fake_deps(name, *, if_installed): + captured_args["name"] = name + captured_args["if_installed"] = if_installed + return {"core": ["pkg_a"], "dev": ["pkg_b"]} + + versions = {"scitrack": "9.9.9", "pkg_a": "1.1", "pkg_b": "2.2"} + monkeypatch.setattr(_scitrack, "get_package_dependencies", fake_deps) + monkeypatch.setattr(_scitrack, "get_version_for_package", lambda n: versions[n]) + + class _Parent: + f_globals = {"__name__": "scitrack"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.log_versions() + LOGGER.shutdown() + + contents = logfile.read_text() + assert "pkg_a==1.1" in contents + assert "pkg_b==2.2" in contents + assert captured_args == {"name": "scitrack", "if_installed": True} + + +def test_log_versions_dedups_user_pkg_overlapping_dep(monkeypatch, logfile): + # a name appearing in both deps and the user list yields exactly one version line + monkeypatch.setattr( + _scitrack, + "get_package_dependencies", + lambda name, *, if_installed: {"core": ["pkg_a", "pkg_b"]}, + ) + versions = {"scitrack": "9.9.9", "pkg_a": "1.1", "pkg_b": "2.2"} + monkeypatch.setattr(_scitrack, "get_version_for_package", lambda n: versions[n]) + + class _Parent: + f_globals = {"__name__": "scitrack"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.log_versions(["pkg_a"]) + LOGGER.shutdown() + + lines = [ln for ln in logfile.read_text().splitlines() if "\tversion :" in ln] + pkg_a_lines = [ln for ln in lines if "pkg_a==" in ln] + assert len(pkg_a_lines) == 1 + + +def test_log_versions_caller_first_then_alphabetical(monkeypatch, logfile): + # caller's version line precedes the union, which is emitted in alphabetical order + monkeypatch.setattr( + _scitrack, + "get_package_dependencies", + lambda name, *, if_installed: {"core": ["zeta"], "dev": ["alpha"]}, + ) + versions = {"scitrack": "9.9.9", "alpha": "0.1", "mid": "0.5", "zeta": "0.9"} + monkeypatch.setattr(_scitrack, "get_version_for_package", lambda n: versions[n]) + + class _Parent: + f_globals = {"__name__": "scitrack"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.log_versions(["mid"]) + LOGGER.shutdown() + + version_lines = [ + ln.split("\tversion : ", 1)[1] + for ln in logfile.read_text().splitlines() + if "\tversion : " in ln + ] + assert version_lines == ["scitrack==9.9.9", "alpha==0.1", "mid==0.5", "zeta==0.9"] + + +def test_log_versions_caller_in_user_list_not_duplicated(monkeypatch, logfile): + # caller's own name in `packages=` does not double up the caller version line + monkeypatch.setattr( + _scitrack, + "get_package_dependencies", + lambda name, *, if_installed: {}, + ) + versions = {"scitrack": "9.9.9"} + monkeypatch.setattr(_scitrack, "get_version_for_package", lambda n: versions[n]) + + class _Parent: + f_globals = {"__name__": "scitrack"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.log_versions(["scitrack"]) + LOGGER.shutdown() + + version_lines = [ + ln for ln in logfile.read_text().splitlines() if "\tversion :" in ln + ] + scitrack_lines = [ln for ln in version_lines if "scitrack==" in ln] + assert len(scitrack_lines) == 1 + + +def test_log_versions_uninstalled_dep_skipped(monkeypatch, logfile): + # uninstalled declared deps are dropped via if_installed=True before logging + monkeypatch.setattr( + _scitrack.importlib.metadata, + "requires", + lambda _: ["pkg_installed", "pkg_not_installed"], + ) + + def fake_distribution(name): + if name in {"pkg_installed", "scitrack"}: + return + raise _scitrack.importlib.metadata.PackageNotFoundError(name) + + monkeypatch.setattr( + _scitrack.importlib.metadata, + "distribution", + fake_distribution, + ) + + versions = {"scitrack": "9.9.9", "pkg_installed": "1.0"} + monkeypatch.setattr(_scitrack, "get_version_for_package", lambda n: versions[n]) + + class _Parent: + f_globals = {"__name__": "scitrack"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.log_versions() + LOGGER.shutdown() + + contents = logfile.read_text() + assert "pkg_installed==1.0" in contents + assert "pkg_not_installed" not in contents + + +def _make_session_log(logfile): + """write a representative scitrack session for log_summary tests""" + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.input_file(TEST_ROOTDIR / "sample-lf.fasta") + LOGGER.input_file(TEST_ROOTDIR / "sample-crlf.fasta") + LOGGER.log_args({"a": 1, "b": "abc"}) + LOGGER.log_versions(["numpy"]) + LOGGER.shutdown() + + +# timestamp field of the first line of a scitrack log file +def _first_line_timestamp(path): + first = Path(path).read_text().splitlines()[0] + return first.split("\t", 1)[0] + + +# hostname (host portion of the host:pid prefix) of the first line +def _first_line_hostname(path): + first = Path(path).read_text().splitlines()[0] + return first.split("\t", 2)[1].rsplit(":", 1)[0] + + +def test_log_summary_groups_built_in_labels(logfile): + """default call returns every standard scitrack label""" + _make_session_log(logfile) + summary = log_summary(logfile) + + assert "system_details" in summary + assert "python" in summary + assert "user" in summary + assert "command_string" in summary + assert "params" in summary + assert "version" in summary + assert "input_file_path" in summary + assert "input_file_path md5sum" in summary + + assert len(summary["system_details"]) == 1 + assert len(summary["input_file_path"]) == 2 + assert len(summary["input_file_path md5sum"]) == 2 + assert len(summary["version"]) >= 1 # at minimum, numpy from packages= + assert summary["params"] == ["{'a': 1, 'b': 'abc'}"] + + +def test_log_summary_includes_datetime(logfile): + # datetime value is the timestamp of the first logged line + _make_session_log(logfile) + summary = log_summary(logfile) + assert summary["datetime"] == [_first_line_timestamp(logfile)] + + +def test_log_summary_datetime_is_first_line(tmp_path): + # datetime captures only the first line's timestamp, not later ones + first_ts = "2026-06-09 10:00:00" + later_ts = "2026-06-09 11:30:45" + log = tmp_path / "two_times.log" + log.write_text( + f"{first_ts}\thost:1\tINFO\tparams : alpha\n" + f"{later_ts}\thost:1\tINFO\tparams : beta\n", + ) + summary = log_summary(log) + assert summary["datetime"] == [first_ts] + assert summary["datetime"] != [later_ts] + + +def test_log_summary_datetime_from_unrecognised_first_line(tmp_path): + # datetime is taken from the first parsable line even if its label is unknown + ts = "2026-06-09 10:00:00" + log = tmp_path / "unknown_first.log" + log.write_text( + f"{ts}\thost:1\tINFO\tnot_a_known_label : ignored\n" + f"{ts}\thost:1\tINFO\tparams : kept\n", + ) + summary = log_summary(log) + assert summary["datetime"] == [ts] + + +def test_log_summary_empty_file_has_no_datetime(tmp_path): + # a file with no parsable lines yields no datetime key + empty = tmp_path / "empty.log" + empty.write_text("") + assert "datetime" not in log_summary(empty) + + +def test_log_summary_includes_hostname(logfile): + # hostname value is the host portion of the first line's prefix + _make_session_log(logfile) + summary = log_summary(logfile) + assert summary["hostname"] == [_first_line_hostname(logfile)] + + +def test_log_summary_hostname_strips_pid(tmp_path): + # hostname drops the :pid suffix, even for hosts containing colons + ts = "2026-06-09 10:00:00" + log = tmp_path / "host.log" + log.write_text(f"{ts}\tfqdn:host:12345\tINFO\tparams : alpha\n") + assert log_summary(log)["hostname"] == ["fqdn:host"] + + +def test_log_summary_includes_os(logfile): + # os value is the system_details value with the system= prefix stripped + _make_session_log(logfile) + summary = log_summary(logfile) + expected = summary["system_details"][0].removeprefix("system=") + assert summary["os"] == [expected] + + +def test_log_summary_os_absent_without_system_details(tmp_path): + # no system_details line means no os key + log = tmp_path / "no_sys.log" + log.write_text("2026-06-09 10:00:00\thost:1\tINFO\tparams : alpha\n") + assert "os" not in log_summary(log) + + +def test_log_summary_os_without_system_prefix(tmp_path): + # a system_details value lacking the system= prefix is used verbatim + ts = "2026-06-09 10:00:00" + log = tmp_path / "raw_sys.log" + log.write_text(f"{ts}\thost:1\tINFO\tsystem_details : Darwin 25.5.0\n") + assert log_summary(log)["os"] == ["Darwin 25.5.0"] + + +def test_log_summary_os_is_first_system_details(tmp_path): + # os captures only the first system_details line + ts = "2026-06-09 10:00:00" + log = tmp_path / "two_sys.log" + log.write_text( + f"{ts}\thost:1\tINFO\tsystem_details : system=first\n" + f"{ts}\thost:1\tINFO\tsystem_details : system=second\n", + ) + assert log_summary(log)["os"] == ["first"] + + +def test_log_summary_default_path_drops_reserved_label_line(tmp_path): + # a reserved-named label line is silently dropped on the default path + # and the derived os value still wins + ts = "2026-06-09 10:00:00" + log = tmp_path / "reserved_line.log" + log.write_text( + f"{ts}\thost:1\tINFO\tsystem_details : system=real\n" + f"{ts}\thost:1\tINFO\tos : spoofed\n", + ) + summary = log_summary(log) + assert summary["os"] == ["real"] + + +@pytest.mark.parametrize("label", ["datetime", "hostname", "os"]) +def test_log_summary_reserved_label_rejected(tmp_path, label): + # reserved keys cannot be requested via labels + log = tmp_path / "synth.log" + log.write_text("2026-06-09 10:00:00\thost:1\tINFO\tparams : alpha\n") + with pytest.raises(ValueError, match="reserved"): + log_summary(log, labels=[label]) + + +@pytest.mark.parametrize("label", ["datetime", "hostname", "os"]) +def test_log_summary_all_labels_rejects_reserved_line(tmp_path, label): + # an all_labels capture of a reserved-labelled line is rejected + ts = "2026-06-09 10:00:00" + log = tmp_path / "synth.log" + log.write_text( + f"{ts}\thost:1\tINFO\tparams : alpha\n{ts}\thost:1\tINFO\t{label} : whatever\n", + ) + with pytest.raises(ValueError, match="reserved"): + log_summary(log, all_labels=True) + + +def test_log_summary_preserves_values_with_colons(logfile): + """`params : {'k': 'v'}` value is captured intact even though it contains colons""" + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.log_args({"x": 1, "y": "needs : space"}) + LOGGER.shutdown() + + summary = log_summary(logfile) + assert summary["params"] == ["{'x': 1, 'y': 'needs : space'}"] + + +def test_log_summary_accepts_pathlike(logfile): + """passing a Path object works, not just str""" + _make_session_log(logfile) + assert log_summary(Path(logfile)) == log_summary(str(logfile)) + + +def test_log_summary_md5sum_default_includes_output(logfile): + """output_file_path md5sum is recognised without specifying labels""" + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.output_file(TEST_ROOTDIR / "sample-lf.fasta") + LOGGER.shutdown() + + summary = log_summary(logfile) + assert "output_file_path" in summary + assert "output_file_path md5sum" in summary + + +def test_log_summary_extra_labels(logfile): + """user-supplied labels widen the recognised set""" + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.input_file(TEST_ROOTDIR / "sample-lf.fasta", label="my-tag") + LOGGER.shutdown() + + default = log_summary(logfile) + assert "my-tag" not in default + assert "my-tag md5sum" not in default + + widened = log_summary(logfile, labels=["my-tag", "my-tag md5sum"]) + assert len(widened["my-tag"]) == 1 + assert len(widened["my-tag md5sum"]) == 1 + + +def test_log_summary_loglabel_enum_as_extra_label(logfile): + """LogLabel members can be passed via the labels list (back-compat with strings)""" + _make_session_log(logfile) + # PARAMS is already recognised by default. Passing it again is a no-op + # but should not double-count entries. + summary = log_summary(logfile, labels=[LogLabel.PARAMS]) + assert len(summary["params"]) == 1 + + +def test_log_summary_empty_file(tmp_path): + """empty log file yields empty dict""" + empty = tmp_path / "empty.log" + empty.write_text("") + assert log_summary(empty) == {} + + +def test_log_summary_ignores_unknown_labels(tmp_path): + """lines with an unrecognised label are skipped""" + ts = "2026-06-09 10:00:00" + host = "host" + log = tmp_path / "synth.log" + log.write_text( + f"{ts}\t{host}:1\tINFO\tparams : alpha\n" + f"{ts}\t{host}:1\tINFO\tnot_a_known_label : ignored\n" + f"{ts}\t{host}:1\tINFO\tparams : beta\n", + ) + summary = log_summary(log) + assert summary == { + "datetime": [ts], + "hostname": [host], + "params": ["alpha", "beta"], + } + + +def test_log_summary_skips_malformed_lines(tmp_path): + """blank lines, lines with too few tabs, and lines without ' : ' are all skipped""" + ts = "2026-06-09 10:00:00" + host = "host" + log = tmp_path / "malformed.log" + log.write_text( + "\n" # blank + "no tabs at all\n" # < 4 fields + f"{ts}\t{host}:1\tINFO\tno_colon_separator\n" # no ' : ' + f"{ts}\t{host}:1\tINFO\tparams : kept\n", + ) + assert log_summary(log) == { + "datetime": [ts], + "hostname": [host], + "params": ["kept"], + } + + +def test_log_summary_multiple_entries_preserve_order(tmp_path): + """multiple entries under the same label come back in file order""" + ts = "2026-06-09 10:00:00" + host = "host" + log = tmp_path / "ordered.log" + log.write_text( + f"{ts}\t{host}:1\tINFO\tversion : a==1\n" + f"{ts}\t{host}:1\tINFO\tversion : b==2\n" + f"{ts}\t{host}:1\tINFO\tversion : c==3\n", + ) + assert log_summary(log) == { + "datetime": [ts], + "hostname": [host], + "version": ["a==1", "b==2", "c==3"], + } + + +def test_log_summary_recognises_license_label(tmp_path): + """license lines emitted by log_licenses are captured by default""" + ts = "2026-06-09 10:00:00" + host = "host" + log = tmp_path / "lic.log" + log.write_text( + f"{ts}\t{host}:1\tINFO\tlicense : scitrack==BSD-3-Clause\n" + f"{ts}\t{host}:1\tINFO\tlicense : numpy==BSD-3-Clause\n", + ) + assert log_summary(log) == { + "datetime": [ts], + "hostname": [host], + "license": ["scitrack==BSD-3-Clause", "numpy==BSD-3-Clause"], + } + + +def test_log_summary_all_labels_captures_unknown(tmp_path): + """all_labels=True records every label, including ones not in the recognised set""" + ts = "2026-06-09 10:00:00" + host = "host" + log = tmp_path / "all.log" + log.write_text( + f"{ts}\t{host}:1\tINFO\tparams : standard\n" + f"{ts}\t{host}:1\tINFO\tbespoke_tag : x\n" + f"{ts}\t{host}:1\tINFO\tanother_tag : y\n", + ) + + default = log_summary(log) + assert "bespoke_tag" not in default + assert "another_tag" not in default + + everything = log_summary(log, all_labels=True) + assert everything == { + "datetime": [ts], + "hostname": [host], + "params": ["standard"], + "bespoke_tag": ["x"], + "another_tag": ["y"], + } + + +def test_loglabel_license_value(): + # LogLabel exposes a LICENSE member that formats as "license" + assert LogLabel.LICENSE == "license" + assert f"{LogLabel.LICENSE}" == "license" + + +def test_get_package_licenses_returns_dict_for_installed(): + # returns a {name: license_string} mapping for installed packages + got = get_package_licenses(["pytest"]) + assert set(got) == {"pytest"} + assert isinstance(got["pytest"], str) + assert got["pytest"] + + +def test_get_package_licenses_raises_for_uninstalled(): + # uninstalled package name -> PackageNotFoundError carrying the name + from importlib.metadata import PackageNotFoundError + + with pytest.raises(PackageNotFoundError, match="definitely_not_installed_xyz"): + get_package_licenses(["definitely_not_installed_xyz"]) + + +def test_get_package_licenses_prefers_license_expression(monkeypatch): + # PEP 639 License-Expression wins when both fields are present + monkeypatch.setattr( + _scitrack.importlib.metadata, + "metadata", + lambda _: {"License-Expression": "MIT", "License": "Apache-2.0"}, + ) + assert get_package_licenses(["anything"]) == {"anything": "MIT"} + + +def test_get_package_licenses_falls_back_to_license(monkeypatch): + # legacy License field is used when License-Expression is missing + monkeypatch.setattr( + _scitrack.importlib.metadata, + "metadata", + lambda _: {"License": "BSD-3-Clause"}, + ) + assert get_package_licenses(["anything"]) == {"anything": "BSD-3-Clause"} + + +def test_get_package_licenses_unknown_when_missing(monkeypatch): + # neither License-Expression nor License declared -> "UNKNOWN" + monkeypatch.setattr(_scitrack.importlib.metadata, "metadata", lambda _: {}) + assert get_package_licenses(["anything"]) == {"anything": "UNKNOWN"} + + +@pytest.mark.parametrize( + "meta", + [ + pytest.param({"License-Expression": "", "License": "MIT"}, id="empty_expr"), + pytest.param( + {"License-Expression": "UNKNOWN", "License": "MIT"}, + id="sentinel_expr", + ), + ], +) +def test_get_package_licenses_falls_through_empty_or_sentinel(monkeypatch, meta): + # an empty or literal-"UNKNOWN" License-Expression falls through to License + monkeypatch.setattr(_scitrack.importlib.metadata, "metadata", lambda _: meta) + assert get_package_licenses(["anything"]) == {"anything": "MIT"} + + +def test_get_package_licenses_partial_raises_eagerly(monkeypatch): + # a missing name in the middle of the list raises rather than returning a partial dict + from importlib.metadata import PackageNotFoundError + + def fake_metadata(name): + if name == "definitely_not_installed_xyz": + raise PackageNotFoundError(name) + return {"License": "MIT"} + + monkeypatch.setattr(_scitrack.importlib.metadata, "metadata", fake_metadata) + with pytest.raises(PackageNotFoundError, match="definitely_not_installed_xyz"): + get_package_licenses(["pkg_a", "definitely_not_installed_xyz", "pkg_b"]) + + +def test_log_licenses_uses_caller_package(monkeypatch, logfile): + # caller's package is resolved from frame globals and its license is logged + monkeypatch.setattr( + _scitrack, + "_license_for_package", + lambda name: {"scitrack": "BSD-3-Clause"}[name], + ) + monkeypatch.setattr( + _scitrack, + "get_package_dependencies", + lambda name, *, if_installed: {}, + ) + + class _Parent: + f_globals = {"__name__": "scitrack"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.log_licenses() + LOGGER.shutdown() + + contents = logfile.read_text() + assert "\tlicense : scitrack==BSD-3-Clause" in contents + + +def test_log_licenses_emits_installed_deps(monkeypatch, logfile): + # caller's deps (resolved with if_installed=True) are flattened into license lines + captured: dict[str, object] = {} + + def fake_deps(name, *, if_installed): + captured["name"] = name + captured["if_installed"] = if_installed + return {"core": ["pkg_a"], "dev": ["pkg_b"]} + + licenses = {"scitrack": "BSD-3-Clause", "pkg_a": "MIT", "pkg_b": "Apache-2.0"} + monkeypatch.setattr(_scitrack, "get_package_dependencies", fake_deps) + monkeypatch.setattr(_scitrack, "_license_for_package", lambda n: licenses[n]) + + class _Parent: + f_globals = {"__name__": "scitrack"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.log_licenses() + LOGGER.shutdown() + + contents = logfile.read_text() + assert "pkg_a==MIT" in contents + assert "pkg_b==Apache-2.0" in contents + assert captured == {"name": "scitrack", "if_installed": True} + + +def test_log_licenses_partial_list_raises_eagerly(logfile): + # mixed list: bad name aborts before any "license :" line is written + from importlib.metadata import PackageNotFoundError + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + with pytest.raises(PackageNotFoundError, match="definitely_not_installed_xyz"): + LOGGER.log_licenses(["pytest", "definitely_not_installed_xyz"]) + LOGGER.shutdown() + assert not any("license :" in line for line in logfile.read_text().splitlines()) + + +def test_log_licenses_caller_first_then_alphabetical_dedup(monkeypatch, logfile): + # caller's line precedes the union; the union is alphabetical and de-duplicated + monkeypatch.setattr( + _scitrack, + "get_package_dependencies", + lambda name, *, if_installed: {"core": ["zeta", "alpha"], "dev": ["alpha"]}, + ) + licenses = { + "scitrack": "BSD-3-Clause", + "alpha": "MIT", + "mid": "Apache-2.0", + "zeta": "GPL-3.0", + } + monkeypatch.setattr(_scitrack, "_license_for_package", lambda n: licenses[n]) + + class _Parent: + f_globals = {"__name__": "scitrack"} + + class _Frame: + f_back = _Parent() + + monkeypatch.setattr(_scitrack.inspect, "currentframe", lambda: _Frame()) + + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.log_licenses(["mid", "alpha", "scitrack"]) + LOGGER.shutdown() + + license_lines = [ + ln.split("\tlicense : ", 1)[1] + for ln in logfile.read_text().splitlines() + if "\tlicense : " in ln + ] + assert license_lines == [ + "scitrack==BSD-3-Clause", + "alpha==MIT", + "mid==Apache-2.0", + "zeta==GPL-3.0", + ] + + +@pytest.mark.parametrize( + "make_frame", + [ + pytest.param(lambda: None, id="no_current_frame"), + pytest.param(lambda: type("_F", (), {"f_back": None})(), id="no_parent_frame"), + ], +) +def test_log_licenses_silent_on_frame_failure(monkeypatch, logfile, make_frame): + # restricted runtimes (None) and top-of-stack callers (f_back is None) both no-op + monkeypatch.setattr(_scitrack.inspect, "currentframe", make_frame) + LOGGER = CachingLogger(create_dir=True) + LOGGER.log_file_path = logfile + LOGGER.log_licenses() + LOGGER.shutdown() + assert not any("license :" in line for line in logfile.read_text().splitlines())