import logging
import time
from dataclasses import dataclass
from enum import IntEnum
from pathlib import Path
from rmote.protocol import Tool, process
[docs]
class State(IntEnum):
PRESENT = 0
ABSENT = 1
LATEST = 2
[docs]
@dataclass
class Package:
name: str
state: int = State.PRESENT
version: str = ""
[docs]
@classmethod
def parse(cls, s: str | object, state: State | int = State.PRESENT) -> "Package":
coerced = State(state)
if isinstance(s, cls):
return cls(name=s.name, version=s.version, state=s.state)
if not isinstance(s, str):
raise TypeError(f"Expected str or Package, got {type(s).__name__!r}")
return cls.from_string(s, state=coerced)
[docs]
@classmethod
def from_string(cls, s: str, state: State = State.PRESENT) -> "Package":
if "=" in s:
name, version = s.split("=", 1)
return cls(name=name, version=version, state=state)
return cls(name=s, version="", state=state)
def __str__(self) -> str:
return self.name # pacman does not support name=version pinning
[docs]
@dataclass
class Result:
name: str
changed: bool
version: str = ""
[docs]
@classmethod
def from_package(cls, package: "Package", changed: bool) -> "Result":
return cls(name=package.name, version=package.version, changed=changed)
class Backend:
@staticmethod
def pacman(*args: str) -> tuple[int, str, str]:
logging.debug("calling pacman with args: %s", args)
result = process(
"pacman",
"--noconfirm",
*args,
capture_output=True,
text=True,
)
return result.returncode, result.stdout, result.stderr
@staticmethod
def query(name: str) -> tuple[bool, str]:
"""Return (installed, version) for a package."""
rc, stdout, _ = Backend.pacman("-Q", name)
if rc != 0:
return False, ""
parts = stdout.strip().split()
return True, (parts[1] if len(parts) >= 2 else "")
@staticmethod
def parse_upgradable(stdout: str) -> list[str]:
"""Parse package names from 'pacman -Qu' output."""
return [line.split()[0] for line in stdout.splitlines() if line.split()]
[docs]
class Pacman(Tool):
"""Manage Arch Linux packages via pacman. Requires root on the remote host."""
[docs]
@staticmethod
def update(ttl: int | float = -1) -> bool:
"""Synchronise the package database (``pacman -Sy``), with optional TTL-based skipping.
Args:
ttl: Minimum age in seconds of the last sync before re-running.
``-1`` (default) always synchronises.
Returns:
``True`` if ``pacman -Sy`` was executed, ``False`` if the database
is fresher than *ttl* seconds.
Raises:
RuntimeError: If ``pacman -Sy`` exits with a non-zero status.
"""
stamp = Path("/var/lib/pacman/.rmote-update")
if ttl >= 0:
try:
age = time.time() - stamp.stat().st_mtime
except FileNotFoundError:
age = float("inf")
if age < ttl:
return False
rc, _, err = Backend.pacman("-Sy")
if rc != 0:
raise RuntimeError(f"pacman -Sy failed:\n{err}")
stamp.touch()
return True
[docs]
@staticmethod
def package(package: str | Package, state: State | int = State.PRESENT) -> Result:
"""Install, remove, or upgrade a single package.
Args:
package: Package name or a :class:`Package` instance. Version
pinning (``name=version``) is parsed but ignored by pacman.
state: Desired state - :attr:`State.PRESENT` (install if absent),
:attr:`State.ABSENT` (remove if installed), or
:attr:`State.LATEST` (install or upgrade).
Returns:
:class:`Result` with the package name, installed version, and
whether the system was changed.
Raises:
RuntimeError: If the underlying ``pacman`` invocation fails.
"""
package = Package.parse(package, state=state)
installed, version = Backend.query(package.name)
if state == State.PRESENT:
if installed:
return Result(name=package.name, version=version, changed=False)
rc, _, err = Backend.pacman("-S", package.name)
if rc != 0:
raise RuntimeError(f"pacman -S {package.name!r} failed:\n{err}")
_, version = Backend.query(package.name)
return Result(name=package.name, version=version, changed=True)
if state == State.ABSENT:
if not installed:
return Result(name=package.name, version="", changed=False)
rc, _, err = Backend.pacman("-R", package.name)
if rc != 0:
raise RuntimeError(f"pacman -R {package.name!r} failed:\n{err}")
return Result(name=package.name, version="", changed=True)
if state == State.LATEST:
if not installed:
rc, _, err = Backend.pacman("-S", package.name)
if rc != 0:
raise RuntimeError(f"pacman -S {package.name!r} failed:\n{err}")
_, version = Backend.query(package.name)
return Result(name=package.name, version=version, changed=True)
_, stdout, _ = Backend.pacman("-Qu")
if package.name not in Backend.parse_upgradable(stdout):
return Result(name=package.name, version=version, changed=False)
rc, _, err = Backend.pacman("-S", package.name)
if rc != 0:
raise RuntimeError(f"pacman -S {package.name!r} failed:\n{err}")
_, version = Backend.query(package.name)
return Result(name=package.name, version=version, changed=True)
raise ValueError(f"Unknown state: {state!r}")
[docs]
@classmethod
def converge(cls, *packages: str | Package) -> list[Result]:
"""Ensure all *packages* are present.
Args:
*packages: Package names or :class:`Package` instances.
Returns:
List of :class:`Result` objects, one per package, in the same
order as the input.
"""
return [cls.package(pkg) for pkg in packages]