Coverage for src / openenv / manifests / lockfile.py: 89.47%
118 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 13:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 13:36 +0000
1"""Lockfile creation and serialization."""
3from __future__ import annotations
5import json
6import re
7import subprocess
8from pathlib import Path
9from typing import Any, Callable
11from openenv.core.errors import LockResolutionError, ValidationError
12from openenv.core.models import Lockfile, Manifest
13from openenv.core.utils import sha256_text, stable_json_dumps
15_DIRECT_REFERENCE_PATTERN = re.compile(
16 r"^\s*([A-Za-z0-9_.-]+(?:\[[A-Za-z0-9_,.-]+\])?)\s*@\s*(\S+)\s*$"
17)
18_PINNED_REQUIREMENT_PATTERN = re.compile(
19 r"^\s*([A-Za-z0-9_.-]+(?:\[[A-Za-z0-9_,.-]+\])?)\s*==\s*([A-Za-z0-9_.!+-]+)\s*$"
20)
21_PINNED_NODE_REQUIREMENT_PATTERN = re.compile(
22 r"^\s*(?P<name>(?:@[A-Za-z0-9._-]+/)?[A-Za-z0-9._-]+)@(?P<version>[A-Za-z0-9][A-Za-z0-9.+_-]*)\s*$"
23)
24_DIGEST_PATTERN = re.compile(r"^(?P<image>.+)@(?P<digest>sha256:[a-fA-F0-9]{64})$")
27def build_lockfile(
28 manifest: Manifest,
29 raw_manifest_text: str,
30 *,
31 resolver: Callable[[str], dict[str, str]] | None = None,
32) -> Lockfile:
33 """Create a deterministic lockfile from a manifest."""
34 manifest_hash = sha256_text(stable_json_dumps(manifest.to_dict(), indent=None))
35 image_info = resolve_base_image(manifest.runtime.base_image, resolver=resolver)
36 python_packages = [
37 _resolve_python_requirement(requirement)
38 for requirement in manifest.runtime.python_packages
39 ]
40 node_packages = [
41 _resolve_node_requirement(requirement)
42 for requirement in manifest.runtime.node_packages
43 ]
44 return Lockfile(
45 lock_version=1,
46 manifest_hash=manifest_hash,
47 base_image={
48 "digest": image_info["digest"],
49 "reference": manifest.runtime.base_image,
50 "resolved_reference": image_info["resolved_reference"],
51 },
52 python_packages=python_packages,
53 node_packages=node_packages,
54 system_packages=list(manifest.runtime.system_packages),
55 source_snapshot={
56 **manifest.source_snapshot(),
57 "raw_manifest_sha256": sha256_text(raw_manifest_text),
58 },
59 )
62def resolve_base_image(
63 base_image: str,
64 *,
65 resolver: Callable[[str], dict[str, str]] | None = None,
66) -> dict[str, str]:
67 """Resolve the Docker base image to a content-addressed reference."""
68 if resolver is not None:
69 return resolver(base_image)
71 digest_match = _DIGEST_PATTERN.match(base_image)
72 if digest_match:
73 return {
74 "digest": digest_match.group("digest"),
75 "resolved_reference": base_image,
76 }
78 try:
79 completed = _inspect_base_image(base_image)
80 except OSError as exc:
81 raise LockResolutionError(
82 "Docker is required to resolve an unpinned base image. "
83 "Pin runtime.base_image with @sha256 or make docker available."
84 ) from exc
85 except subprocess.CalledProcessError as exc:
86 if _is_missing_local_image_error(exc): 86 ↛ 89line 86 didn't jump to line 89 because the condition on line 86 was always true
87 completed = _pull_and_inspect_base_image(base_image)
88 else:
89 raise LockResolutionError(
90 "Unable to resolve runtime.base_image. "
91 "Pin it with @sha256 or ensure the image is present locally. "
92 f"Docker said: {_docker_error_detail(exc)}"
93 ) from exc
95 try:
96 repo_digests = json.loads(completed.stdout.strip() or "[]")
97 except json.JSONDecodeError as exc:
98 raise LockResolutionError(
99 "Docker returned an unreadable RepoDigests payload while resolving the base image."
100 ) from exc
101 if not repo_digests:
102 raise LockResolutionError(
103 "Docker did not return a RepoDigest for the base image. "
104 "Pin runtime.base_image with @sha256 for deterministic locks."
105 )
107 resolved_reference = repo_digests[0]
108 digest_match = _DIGEST_PATTERN.match(resolved_reference)
109 if digest_match is None:
110 raise LockResolutionError(
111 f"Resolved base image did not include a digest: {resolved_reference}"
112 )
113 digest = digest_match.group("digest")
114 return {
115 "digest": digest,
116 "resolved_reference": _attach_digest(base_image, digest),
117 }
120def _inspect_base_image(base_image: str) -> subprocess.CompletedProcess[str]:
121 """Inspect a local Docker image and return the raw `RepoDigests` response."""
122 return subprocess.run(
123 [
124 "docker",
125 "image",
126 "inspect",
127 base_image,
128 "--format",
129 "{{json .RepoDigests}}",
130 ],
131 check=True,
132 capture_output=True,
133 text=True,
134 )
137def _pull_and_inspect_base_image(base_image: str) -> subprocess.CompletedProcess[str]:
138 """Pull a missing base image and then inspect it locally to obtain its digest."""
139 try:
140 subprocess.run(
141 ["docker", "image", "pull", base_image],
142 check=True,
143 capture_output=True,
144 text=True,
145 )
146 except OSError as exc:
147 raise LockResolutionError(
148 "Docker is required to pull and resolve an unpinned base image. "
149 "Pin runtime.base_image with @sha256 or make docker available."
150 ) from exc
151 except subprocess.CalledProcessError as exc:
152 raise LockResolutionError(
153 "Unable to resolve runtime.base_image locally and docker pull failed. "
154 f"Docker said: {_docker_error_detail(exc)}"
155 ) from exc
157 try:
158 return _inspect_base_image(base_image)
159 except OSError as exc:
160 raise LockResolutionError(
161 "Docker is required to inspect the pulled base image. "
162 "Pin runtime.base_image with @sha256 or make docker available."
163 ) from exc
164 except subprocess.CalledProcessError as exc:
165 raise LockResolutionError(
166 "Docker pulled runtime.base_image but it still could not be inspected locally. "
167 f"Docker said: {_docker_error_detail(exc)}"
168 ) from exc
171def _is_missing_local_image_error(exc: subprocess.CalledProcessError) -> bool:
172 """Return whether Docker reported that the requested image is absent locally."""
173 detail = _docker_error_detail(exc).lower()
174 return "no such image" in detail or "no such object" in detail
177def _docker_error_detail(exc: subprocess.CalledProcessError) -> str:
178 """Extract a human-readable stderr message from a failed Docker command."""
179 return exc.stderr.strip() if exc.stderr else "unknown docker error"
182def _attach_digest(base_image: str, digest: str) -> str:
183 """Attach a resolved digest to the original image reference while preserving its tag."""
184 return f"{base_image}@{digest}"
187def _resolve_python_requirement(requirement: str) -> dict[str, str]:
188 """Normalize a pinned Python requirement into the lockfile package schema."""
189 direct_match = _DIRECT_REFERENCE_PATTERN.match(requirement)
190 if direct_match is not None: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 return {
192 "kind": "direct",
193 "name": direct_match.group(1).lower(),
194 "requirement": requirement,
195 "url": direct_match.group(2),
196 }
198 pinned_match = _PINNED_REQUIREMENT_PATTERN.match(requirement)
199 if pinned_match is not None:
200 return {
201 "kind": "pinned",
202 "name": pinned_match.group(1).lower(),
203 "requirement": requirement,
204 "version": pinned_match.group(2),
205 }
207 raise LockResolutionError(
208 "OpenClawenv v1 locks only exact Python requirements. "
209 f"Use 'package==version' or 'name @ URL': {requirement}"
210 )
213def _resolve_node_requirement(requirement: str) -> dict[str, str]:
214 """Normalize a pinned Node.js requirement into the lockfile package schema."""
215 pinned_match = _PINNED_NODE_REQUIREMENT_PATTERN.match(requirement)
216 if pinned_match is not None:
217 return {
218 "kind": "pinned",
219 "name": pinned_match.group("name"),
220 "requirement": requirement,
221 "version": pinned_match.group("version"),
222 }
224 raise LockResolutionError(
225 "OpenClawenv v1 locks only exact Node requirements. "
226 f"Use 'package@version' or '@scope/package@version': {requirement}"
227 )
230def dump_lockfile(lockfile: Lockfile) -> str:
231 """Serialize a lockfile deterministically."""
232 return stable_json_dumps(lockfile.to_dict(), indent=2) + "\n"
235def write_lockfile(path: str | Path, lockfile: Lockfile) -> None:
236 """Write a lockfile to disk."""
237 Path(path).write_text(dump_lockfile(lockfile), encoding="utf-8")
240def load_lockfile(path: str | Path) -> Lockfile:
241 """Load a lockfile from JSON."""
242 lock_path = Path(path)
243 try:
244 data = json.loads(lock_path.read_text(encoding="utf-8"))
245 except FileNotFoundError as exc:
246 raise ValidationError(f"Lockfile not found: {lock_path}") from exc
247 except json.JSONDecodeError as exc:
248 raise ValidationError(f"Invalid JSON in {lock_path}: {exc}") from exc
249 return parse_lockfile(data)
252def parse_lockfile(data: dict[str, Any]) -> Lockfile:
253 """Validate a parsed lockfile payload."""
254 if not isinstance(data, dict):
255 raise ValidationError("Lockfile root must be an object.")
256 required_keys = {
257 "lock_version",
258 "manifest_hash",
259 "base_image",
260 "python_packages",
261 "node_packages",
262 "system_packages",
263 "source_snapshot",
264 }
265 missing = sorted(required_keys - set(data))
266 if missing:
267 raise ValidationError(f"Lockfile is missing required keys: {', '.join(missing)}")
268 if data["lock_version"] != 1: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true
269 raise ValidationError("lock_version must be set to 1.")
270 if not isinstance(data["base_image"], dict):
271 raise ValidationError("lockfile.base_image must be an object.")
272 if not isinstance(data["python_packages"], list): 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 raise ValidationError("lockfile.python_packages must be a list.")
274 if not isinstance(data["node_packages"], list): 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 raise ValidationError("lockfile.node_packages must be a list.")
276 if not isinstance(data["system_packages"], list): 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 raise ValidationError("lockfile.system_packages must be a list.")
278 if not isinstance(data["source_snapshot"], dict): 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true
279 raise ValidationError("lockfile.source_snapshot must be an object.")
280 return Lockfile(
281 lock_version=data["lock_version"],
282 manifest_hash=_require_string(data, "manifest_hash"),
283 base_image=data["base_image"],
284 python_packages=data["python_packages"],
285 node_packages=data["node_packages"],
286 system_packages=data["system_packages"],
287 source_snapshot=data["source_snapshot"],
288 )
291def _require_string(data: dict[str, Any], key: str) -> str:
292 """Require a non-empty string when validating deserialized lockfile payloads."""
293 value = data.get(key)
294 if not isinstance(value, str) or not value.strip():
295 raise ValidationError(f"{key} must be a non-empty string.")
296 return value