Coverage for src / openenv / manifests / lockfile.py: 89.47%

118 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-25 13:36 +0000

1"""Lockfile creation and serialization.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import re 

7import subprocess 

8from pathlib import Path 

9from typing import Any, Callable 

10 

11from openenv.core.errors import LockResolutionError, ValidationError 

12from openenv.core.models import Lockfile, Manifest 

13from openenv.core.utils import sha256_text, stable_json_dumps 

14 

15_DIRECT_REFERENCE_PATTERN = re.compile( 

16 r"^\s*([A-Za-z0-9_.-]+(?:\[[A-Za-z0-9_,.-]+\])?)\s*@\s*(\S+)\s*$" 

17) 

18_PINNED_REQUIREMENT_PATTERN = re.compile( 

19 r"^\s*([A-Za-z0-9_.-]+(?:\[[A-Za-z0-9_,.-]+\])?)\s*==\s*([A-Za-z0-9_.!+-]+)\s*$" 

20) 

21_PINNED_NODE_REQUIREMENT_PATTERN = re.compile( 

22 r"^\s*(?P<name>(?:@[A-Za-z0-9._-]+/)?[A-Za-z0-9._-]+)@(?P<version>[A-Za-z0-9][A-Za-z0-9.+_-]*)\s*$" 

23) 

24_DIGEST_PATTERN = re.compile(r"^(?P<image>.+)@(?P<digest>sha256:[a-fA-F0-9]{64})$") 

25 

26 

27def build_lockfile( 

28 manifest: Manifest, 

29 raw_manifest_text: str, 

30 *, 

31 resolver: Callable[[str], dict[str, str]] | None = None, 

32) -> Lockfile: 

33 """Create a deterministic lockfile from a manifest.""" 

34 manifest_hash = sha256_text(stable_json_dumps(manifest.to_dict(), indent=None)) 

35 image_info = resolve_base_image(manifest.runtime.base_image, resolver=resolver) 

36 python_packages = [ 

37 _resolve_python_requirement(requirement) 

38 for requirement in manifest.runtime.python_packages 

39 ] 

40 node_packages = [ 

41 _resolve_node_requirement(requirement) 

42 for requirement in manifest.runtime.node_packages 

43 ] 

44 return Lockfile( 

45 lock_version=1, 

46 manifest_hash=manifest_hash, 

47 base_image={ 

48 "digest": image_info["digest"], 

49 "reference": manifest.runtime.base_image, 

50 "resolved_reference": image_info["resolved_reference"], 

51 }, 

52 python_packages=python_packages, 

53 node_packages=node_packages, 

54 system_packages=list(manifest.runtime.system_packages), 

55 source_snapshot={ 

56 **manifest.source_snapshot(), 

57 "raw_manifest_sha256": sha256_text(raw_manifest_text), 

58 }, 

59 ) 

60 

61 

62def resolve_base_image( 

63 base_image: str, 

64 *, 

65 resolver: Callable[[str], dict[str, str]] | None = None, 

66) -> dict[str, str]: 

67 """Resolve the Docker base image to a content-addressed reference.""" 

68 if resolver is not None: 

69 return resolver(base_image) 

70 

71 digest_match = _DIGEST_PATTERN.match(base_image) 

72 if digest_match: 

73 return { 

74 "digest": digest_match.group("digest"), 

75 "resolved_reference": base_image, 

76 } 

77 

78 try: 

79 completed = _inspect_base_image(base_image) 

80 except OSError as exc: 

81 raise LockResolutionError( 

82 "Docker is required to resolve an unpinned base image. " 

83 "Pin runtime.base_image with @sha256 or make docker available." 

84 ) from exc 

85 except subprocess.CalledProcessError as exc: 

86 if _is_missing_local_image_error(exc): 86 ↛ 89line 86 didn't jump to line 89 because the condition on line 86 was always true

87 completed = _pull_and_inspect_base_image(base_image) 

88 else: 

89 raise LockResolutionError( 

90 "Unable to resolve runtime.base_image. " 

91 "Pin it with @sha256 or ensure the image is present locally. " 

92 f"Docker said: {_docker_error_detail(exc)}" 

93 ) from exc 

94 

95 try: 

96 repo_digests = json.loads(completed.stdout.strip() or "[]") 

97 except json.JSONDecodeError as exc: 

98 raise LockResolutionError( 

99 "Docker returned an unreadable RepoDigests payload while resolving the base image." 

100 ) from exc 

101 if not repo_digests: 

102 raise LockResolutionError( 

103 "Docker did not return a RepoDigest for the base image. " 

104 "Pin runtime.base_image with @sha256 for deterministic locks." 

105 ) 

106 

107 resolved_reference = repo_digests[0] 

108 digest_match = _DIGEST_PATTERN.match(resolved_reference) 

109 if digest_match is None: 

110 raise LockResolutionError( 

111 f"Resolved base image did not include a digest: {resolved_reference}" 

112 ) 

113 digest = digest_match.group("digest") 

114 return { 

115 "digest": digest, 

116 "resolved_reference": _attach_digest(base_image, digest), 

117 } 

118 

119 

120def _inspect_base_image(base_image: str) -> subprocess.CompletedProcess[str]: 

121 """Inspect a local Docker image and return the raw `RepoDigests` response.""" 

122 return subprocess.run( 

123 [ 

124 "docker", 

125 "image", 

126 "inspect", 

127 base_image, 

128 "--format", 

129 "{{json .RepoDigests}}", 

130 ], 

131 check=True, 

132 capture_output=True, 

133 text=True, 

134 ) 

135 

136 

137def _pull_and_inspect_base_image(base_image: str) -> subprocess.CompletedProcess[str]: 

138 """Pull a missing base image and then inspect it locally to obtain its digest.""" 

139 try: 

140 subprocess.run( 

141 ["docker", "image", "pull", base_image], 

142 check=True, 

143 capture_output=True, 

144 text=True, 

145 ) 

146 except OSError as exc: 

147 raise LockResolutionError( 

148 "Docker is required to pull and resolve an unpinned base image. " 

149 "Pin runtime.base_image with @sha256 or make docker available." 

150 ) from exc 

151 except subprocess.CalledProcessError as exc: 

152 raise LockResolutionError( 

153 "Unable to resolve runtime.base_image locally and docker pull failed. " 

154 f"Docker said: {_docker_error_detail(exc)}" 

155 ) from exc 

156 

157 try: 

158 return _inspect_base_image(base_image) 

159 except OSError as exc: 

160 raise LockResolutionError( 

161 "Docker is required to inspect the pulled base image. " 

162 "Pin runtime.base_image with @sha256 or make docker available." 

163 ) from exc 

164 except subprocess.CalledProcessError as exc: 

165 raise LockResolutionError( 

166 "Docker pulled runtime.base_image but it still could not be inspected locally. " 

167 f"Docker said: {_docker_error_detail(exc)}" 

168 ) from exc 

169 

170 

171def _is_missing_local_image_error(exc: subprocess.CalledProcessError) -> bool: 

172 """Return whether Docker reported that the requested image is absent locally.""" 

173 detail = _docker_error_detail(exc).lower() 

174 return "no such image" in detail or "no such object" in detail 

175 

176 

177def _docker_error_detail(exc: subprocess.CalledProcessError) -> str: 

178 """Extract a human-readable stderr message from a failed Docker command.""" 

179 return exc.stderr.strip() if exc.stderr else "unknown docker error" 

180 

181 

182def _attach_digest(base_image: str, digest: str) -> str: 

183 """Attach a resolved digest to the original image reference while preserving its tag.""" 

184 return f"{base_image}@{digest}" 

185 

186 

187def _resolve_python_requirement(requirement: str) -> dict[str, str]: 

188 """Normalize a pinned Python requirement into the lockfile package schema.""" 

189 direct_match = _DIRECT_REFERENCE_PATTERN.match(requirement) 

190 if direct_match is not None: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 return { 

192 "kind": "direct", 

193 "name": direct_match.group(1).lower(), 

194 "requirement": requirement, 

195 "url": direct_match.group(2), 

196 } 

197 

198 pinned_match = _PINNED_REQUIREMENT_PATTERN.match(requirement) 

199 if pinned_match is not None: 

200 return { 

201 "kind": "pinned", 

202 "name": pinned_match.group(1).lower(), 

203 "requirement": requirement, 

204 "version": pinned_match.group(2), 

205 } 

206 

207 raise LockResolutionError( 

208 "OpenClawenv v1 locks only exact Python requirements. " 

209 f"Use 'package==version' or 'name @ URL': {requirement}" 

210 ) 

211 

212 

213def _resolve_node_requirement(requirement: str) -> dict[str, str]: 

214 """Normalize a pinned Node.js requirement into the lockfile package schema.""" 

215 pinned_match = _PINNED_NODE_REQUIREMENT_PATTERN.match(requirement) 

216 if pinned_match is not None: 

217 return { 

218 "kind": "pinned", 

219 "name": pinned_match.group("name"), 

220 "requirement": requirement, 

221 "version": pinned_match.group("version"), 

222 } 

223 

224 raise LockResolutionError( 

225 "OpenClawenv v1 locks only exact Node requirements. " 

226 f"Use 'package@version' or '@scope/package@version': {requirement}" 

227 ) 

228 

229 

230def dump_lockfile(lockfile: Lockfile) -> str: 

231 """Serialize a lockfile deterministically.""" 

232 return stable_json_dumps(lockfile.to_dict(), indent=2) + "\n" 

233 

234 

235def write_lockfile(path: str | Path, lockfile: Lockfile) -> None: 

236 """Write a lockfile to disk.""" 

237 Path(path).write_text(dump_lockfile(lockfile), encoding="utf-8") 

238 

239 

240def load_lockfile(path: str | Path) -> Lockfile: 

241 """Load a lockfile from JSON.""" 

242 lock_path = Path(path) 

243 try: 

244 data = json.loads(lock_path.read_text(encoding="utf-8")) 

245 except FileNotFoundError as exc: 

246 raise ValidationError(f"Lockfile not found: {lock_path}") from exc 

247 except json.JSONDecodeError as exc: 

248 raise ValidationError(f"Invalid JSON in {lock_path}: {exc}") from exc 

249 return parse_lockfile(data) 

250 

251 

252def parse_lockfile(data: dict[str, Any]) -> Lockfile: 

253 """Validate a parsed lockfile payload.""" 

254 if not isinstance(data, dict): 

255 raise ValidationError("Lockfile root must be an object.") 

256 required_keys = { 

257 "lock_version", 

258 "manifest_hash", 

259 "base_image", 

260 "python_packages", 

261 "node_packages", 

262 "system_packages", 

263 "source_snapshot", 

264 } 

265 missing = sorted(required_keys - set(data)) 

266 if missing: 

267 raise ValidationError(f"Lockfile is missing required keys: {', '.join(missing)}") 

268 if data["lock_version"] != 1: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 raise ValidationError("lock_version must be set to 1.") 

270 if not isinstance(data["base_image"], dict): 

271 raise ValidationError("lockfile.base_image must be an object.") 

272 if not isinstance(data["python_packages"], list): 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 raise ValidationError("lockfile.python_packages must be a list.") 

274 if not isinstance(data["node_packages"], list): 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 raise ValidationError("lockfile.node_packages must be a list.") 

276 if not isinstance(data["system_packages"], list): 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 raise ValidationError("lockfile.system_packages must be a list.") 

278 if not isinstance(data["source_snapshot"], dict): 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true

279 raise ValidationError("lockfile.source_snapshot must be an object.") 

280 return Lockfile( 

281 lock_version=data["lock_version"], 

282 manifest_hash=_require_string(data, "manifest_hash"), 

283 base_image=data["base_image"], 

284 python_packages=data["python_packages"], 

285 node_packages=data["node_packages"], 

286 system_packages=data["system_packages"], 

287 source_snapshot=data["source_snapshot"], 

288 ) 

289 

290 

291def _require_string(data: dict[str, Any], key: str) -> str: 

292 """Require a non-empty string when validating deserialized lockfile payloads.""" 

293 value = data.get(key) 

294 if not isinstance(value, str) or not value.strip(): 

295 raise ValidationError(f"{key} must be a non-empty string.") 

296 return value