Coverage for src / openenv / manifests / loader.py: 95.92%
229 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 13:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 13:36 +0000
1"""Manifest parsing and validation."""
3from __future__ import annotations
5import re
6import tomllib
7from pathlib import Path, PurePosixPath
8from typing import Any
10from openenv.core.errors import ValidationError
11from openenv.core.models import (
12 AccessConfig,
13 AgentConfig,
14 Manifest,
15 OpenClawConfig,
16 ProjectConfig,
17 RuntimeConfig,
18 SandboxConfig,
19 SecretRef,
20 SkillConfig,
21)
22from openenv.core.skills import ensure_mandatory_skills
23from openenv.envfiles.secret_env import load_secret_refs
25_SENSITIVE_KEY_PATTERN = re.compile(
26 r"(secret|token|password|api[_-]?key|access[_-]?key)", re.IGNORECASE
27)
30def load_manifest(path: str | Path) -> tuple[Manifest, str]:
31 """Read and parse a manifest from disk."""
32 manifest_path = Path(path)
33 try:
34 raw_text = manifest_path.read_text(encoding="utf-8")
35 except FileNotFoundError as exc:
36 raise ValidationError(f"Manifest file not found: {manifest_path}") from exc
37 try:
38 data = tomllib.loads(raw_text)
39 except tomllib.TOMLDecodeError as exc:
40 raise ValidationError(f"Invalid TOML in {manifest_path}: {exc}") from exc
41 manifest = parse_manifest(data, base_dir=manifest_path.parent)
42 sidecar_secret_refs = load_secret_refs(manifest_path.parent / ".env")
43 if sidecar_secret_refs:
44 if manifest.runtime.secret_refs:
45 raise ValidationError(
46 "Declare secret refs either in runtime.secret_refs or in a sibling .env file, not both."
47 )
48 manifest.runtime.secret_refs = sidecar_secret_refs
49 return manifest, raw_text
52def parse_manifest(
53 data: dict[str, Any],
54 *,
55 base_dir: Path | None = None,
56) -> Manifest:
57 """Convert parsed TOML into a strongly typed manifest."""
58 if not isinstance(data, dict):
59 raise ValidationError("Manifest root must be a TOML table.")
61 schema_version = data.get("schema_version")
62 if schema_version != 1:
63 raise ValidationError("schema_version must be set to 1.")
65 project_table = _require_table(data, "project")
66 runtime_table = _require_table(data, "runtime")
67 agent_table = _require_table(data, "agent")
68 openclaw_table = _require_table(data, "openclaw")
69 access_table = data.get("access", {})
70 if access_table and not isinstance(access_table, dict): 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 raise ValidationError("access must be a table when provided.")
73 project = ProjectConfig(
74 name=_require_string(project_table, "name"),
75 version=_require_string(project_table, "version"),
76 description=_require_string(project_table, "description"),
77 runtime=_require_string(project_table, "runtime"),
78 )
79 if project.runtime != "openclaw":
80 raise ValidationError("project.runtime must currently be 'openclaw'.")
82 runtime = RuntimeConfig(
83 base_image=_require_string(runtime_table, "base_image"),
84 python_version=_require_string(runtime_table, "python_version"),
85 system_packages=_string_list(
86 runtime_table.get("system_packages", []),
87 "runtime.system_packages",
88 ),
89 python_packages=_string_list(
90 runtime_table.get("python_packages", []),
91 "runtime.python_packages",
92 ),
93 node_packages=_string_list(
94 runtime_table.get("node_packages", []),
95 "runtime.node_packages",
96 ),
97 env=_string_map(runtime_table.get("env", {}), "runtime.env"),
98 user=_optional_string(runtime_table.get("user"), "runtime.user") or "root",
99 workdir=_optional_string(runtime_table.get("workdir"), "runtime.workdir")
100 or "/workspace",
101 secret_refs=_parse_secret_refs(runtime_table.get("secret_refs", [])),
102 )
103 _validate_runtime(runtime)
105 agent = _parse_agent_config(agent_table, base_dir=base_dir)
107 skills_raw = data.get("skills", [])
108 if not isinstance(skills_raw, list):
109 raise ValidationError("skills must be an array of tables.")
110 skills = [_parse_skill(item, index) for index, item in enumerate(skills_raw, start=1)]
111 skills = ensure_mandatory_skills(skills)
112 _validate_skill_names(skills)
114 sandbox_table = _require_table(openclaw_table, "sandbox")
115 tools_table = openclaw_table.get("tools", {})
116 if not isinstance(tools_table, dict):
117 raise ValidationError("openclaw.tools must be a table when provided.")
119 openclaw = OpenClawConfig(
120 agent_id=_require_string(openclaw_table, "agent_id"),
121 agent_name=_require_string(openclaw_table, "agent_name"),
122 workspace=_optional_string(openclaw_table.get("workspace"), "openclaw.workspace")
123 or "/opt/openclaw/workspace",
124 state_dir=_optional_string(openclaw_table.get("state_dir"), "openclaw.state_dir")
125 or "/opt/openclaw",
126 tools_allow=_string_list(tools_table.get("allow", []), "openclaw.tools.allow"),
127 tools_deny=_string_list(tools_table.get("deny", []), "openclaw.tools.deny"),
128 sandbox=SandboxConfig(
129 mode=_require_string(sandbox_table, "mode"),
130 scope=_require_string(sandbox_table, "scope"),
131 workspace_access=_require_string(sandbox_table, "workspace_access"),
132 network=_require_string(sandbox_table, "network"),
133 read_only_root=_require_bool(sandbox_table, "read_only_root"),
134 ),
135 channels=_json_like_mapping(openclaw_table.get("channels", {}), "openclaw.channels"),
136 )
137 _validate_openclaw(openclaw)
139 access = AccessConfig(
140 websites=_string_list(access_table.get("websites", []), "access.websites"),
141 databases=_string_list(access_table.get("databases", []), "access.databases"),
142 notes=_string_list(access_table.get("notes", []), "access.notes"),
143 )
145 return Manifest(
146 schema_version=schema_version,
147 project=project,
148 runtime=runtime,
149 agent=agent,
150 skills=skills,
151 openclaw=openclaw,
152 access=access,
153 )
156def _parse_secret_refs(value: Any) -> list[SecretRef]:
157 """Parse inline `runtime.secret_refs` entries into typed secret references."""
158 if not isinstance(value, list):
159 raise ValidationError("runtime.secret_refs must be an array of tables.")
160 secret_refs: list[SecretRef] = []
161 for index, item in enumerate(value, start=1):
162 if not isinstance(item, dict):
163 raise ValidationError(f"runtime.secret_refs[{index}] must be a table.")
164 secret_refs.append(
165 SecretRef(
166 name=_require_string(item, "name", prefix=f"runtime.secret_refs[{index}]"),
167 source=_require_string(item, "source", prefix=f"runtime.secret_refs[{index}]"),
168 required=(
169 _optional_bool(
170 item.get("required"),
171 f"runtime.secret_refs[{index}].required",
172 )
173 if "required" in item
174 else True
175 ),
176 )
177 )
178 return secret_refs
181def _parse_agent_config(
182 agent_table: dict[str, Any],
183 *,
184 base_dir: Path | None = None,
185) -> AgentConfig:
186 """Parse the `[agent]` table, resolving optional markdown file references."""
187 agents_md, agents_md_ref = _parse_agent_document(
188 agent_table,
189 "agents_md",
190 base_dir=base_dir,
191 )
192 soul_md, soul_md_ref = _parse_agent_document(
193 agent_table,
194 "soul_md",
195 base_dir=base_dir,
196 )
197 user_md, user_md_ref = _parse_agent_document(
198 agent_table,
199 "user_md",
200 base_dir=base_dir,
201 )
202 identity_md, identity_md_ref = _parse_agent_document(
203 agent_table,
204 "identity_md",
205 base_dir=base_dir,
206 required=False,
207 )
208 tools_md, tools_md_ref = _parse_agent_document(
209 agent_table,
210 "tools_md",
211 base_dir=base_dir,
212 required=False,
213 )
214 memory_seed, memory_seed_ref = _parse_memory_seed(
215 agent_table.get("memory_seed", []),
216 base_dir=base_dir,
217 )
218 return AgentConfig(
219 agents_md=agents_md,
220 soul_md=soul_md,
221 user_md=user_md,
222 identity_md=identity_md,
223 tools_md=tools_md,
224 memory_seed=memory_seed,
225 agents_md_ref=agents_md_ref,
226 soul_md_ref=soul_md_ref,
227 user_md_ref=user_md_ref,
228 identity_md_ref=identity_md_ref,
229 tools_md_ref=tools_md_ref,
230 memory_seed_ref=memory_seed_ref,
231 )
234def _parse_memory_seed(
235 value: Any,
236 *,
237 base_dir: Path | None = None,
238) -> tuple[list[str], str | None]:
239 """Parse `agent.memory_seed` from inline text, a list, or a referenced markdown file."""
240 if isinstance(value, str):
241 if base_dir is not None and _looks_like_markdown_ref(value):
242 memory_ref = _validate_markdown_ref(value, "agent.memory_seed")
243 content = _read_markdown_ref(base_dir, memory_ref, "agent.memory_seed")
244 return _split_memory_seed(content), memory_ref
245 return _split_memory_seed(value), None
246 return _string_list(value, "agent.memory_seed"), None
249def _parse_agent_document(
250 agent_table: dict[str, Any],
251 key: str,
252 *,
253 base_dir: Path | None = None,
254 required: bool = True,
255) -> tuple[str | None, str | None]:
256 """Parse one agent markdown field and optionally dereference a sibling `.md` file."""
257 label = f"agent.{key}"
258 if key not in agent_table:
259 if required: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true
260 raise ValidationError(f"{label} must be a non-empty string.")
261 return None, None
263 value = _optional_string(agent_table.get(key), label)
264 if value is None:
265 return None, None
266 if base_dir is not None and _looks_like_markdown_ref(value):
267 ref = _validate_markdown_ref(value, label)
268 return _read_markdown_ref(base_dir, ref, label), ref
269 return value, None
272def _split_memory_seed(value: str) -> list[str]:
273 """Normalize multiline memory seed text into a list of non-empty logical lines."""
274 return [line.rstrip() for line in value.splitlines() if line.strip()]
277def _looks_like_markdown_ref(value: str) -> bool:
278 """Return whether a manifest string should be interpreted as a markdown file path."""
279 return "\n" not in value and value.strip().lower().endswith(".md")
282def _validate_markdown_ref(value: str, label: str) -> str:
283 """Ensure a markdown file reference stays inside the manifest directory."""
284 normalized = PurePosixPath(value.replace("\\", "/"))
285 if Path(value).is_absolute() or normalized.is_absolute() or ".." in normalized.parts:
286 raise ValidationError(
287 f"{label} reference must stay within the manifest directory: {value}"
288 )
289 return value
292def _read_markdown_ref(base_dir: Path, relative_path: str, label: str) -> str:
293 """Read and validate a referenced markdown file from disk."""
294 file_path = base_dir / Path(relative_path)
295 try:
296 content = file_path.read_text(encoding="utf-8")
297 except FileNotFoundError as exc:
298 raise ValidationError(
299 f"{label} references a missing file: {relative_path}"
300 ) from exc
301 if not file_path.is_file(): 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 raise ValidationError(f"{label} reference must point to a file: {relative_path}")
303 if not content.strip():
304 raise ValidationError(f"{label} file cannot be empty: {relative_path}")
305 return content
308def _parse_skill(item: Any, index: int) -> SkillConfig:
309 """Parse one `[[skills]]` entry, including optional inline assets and source refs."""
310 if not isinstance(item, dict):
311 raise ValidationError(f"skills[{index}] must be a table.")
312 assets = _string_map(item.get("assets", {}), f"skills[{index}].assets")
313 for asset_path in assets:
314 normalized = PurePosixPath(asset_path)
315 if normalized.is_absolute() or ".." in normalized.parts:
316 raise ValidationError(
317 f"skills[{index}].assets path must stay within the skill directory: {asset_path}"
318 )
319 skill = SkillConfig(
320 name=_require_string(item, "name", prefix=f"skills[{index}]"),
321 description=_require_string(item, "description", prefix=f"skills[{index}]"),
322 content=_optional_string(item.get("content"), f"skills[{index}].content"),
323 source=_optional_string(item.get("source"), f"skills[{index}].source"),
324 assets=assets,
325 )
326 if skill.content is None and skill.source is None:
327 raise ValidationError(
328 f"skills[{index}] must define either content or source."
329 )
330 if skill.content is not None and not skill.content.lstrip().startswith("---"):
331 raise ValidationError(
332 f"skills[{index}].content must be a full SKILL.md document with frontmatter."
333 )
334 return skill
337def _validate_runtime(runtime: RuntimeConfig) -> None:
338 """Validate runtime invariants that cannot be expressed by TOML typing alone."""
339 if not runtime.base_image:
340 raise ValidationError("runtime.base_image cannot be empty.")
341 if not PurePosixPath(runtime.workdir).is_absolute():
342 raise ValidationError("runtime.workdir must be an absolute POSIX path.")
343 for key, value in runtime.env.items():
344 if _SENSITIVE_KEY_PATTERN.search(key):
345 raise ValidationError(
346 "Sensitive environment variables must be declared via "
347 "runtime.secret_refs, not runtime.env."
348 )
349 if not value:
350 raise ValidationError(f"runtime.env.{key} cannot be empty.")
353def _validate_openclaw(config: OpenClawConfig) -> None:
354 """Validate OpenClaw-specific path invariants after parsing defaults."""
355 if not PurePosixPath(config.workspace).is_absolute():
356 raise ValidationError("openclaw.workspace must be an absolute POSIX path.")
357 if not PurePosixPath(config.state_dir).is_absolute():
358 raise ValidationError("openclaw.state_dir must be an absolute POSIX path.")
359 allow_set = set(config.tools_allow)
360 deny_set = set(config.tools_deny)
361 overlapping = sorted(allow_set & deny_set)
362 if overlapping:
363 raise ValidationError(
364 "openclaw.tools.allow and openclaw.tools.deny cannot overlap: "
365 + ", ".join(overlapping)
366 )
369def _validate_skill_names(skills: list[SkillConfig]) -> None:
370 """Reject duplicate skill names so workspace paths remain unique."""
371 seen: set[str] = set()
372 for skill in skills:
373 if skill.name in seen:
374 raise ValidationError(f"Duplicate skill name: {skill.name}")
375 seen.add(skill.name)
378def _json_like_mapping(value: Any, label: str) -> dict[str, Any]:
379 """Validate and normalize a TOML table into JSON-compatible nested data."""
380 if value in ({}, None):
381 return {}
382 if not isinstance(value, dict): 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true
383 raise ValidationError(f"{label} must be a table when provided.")
384 normalized: dict[str, Any] = {}
385 for key, item in value.items():
386 if not isinstance(key, str) or not key.strip(): 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true
387 raise ValidationError(f"{label} keys must be non-empty strings.")
388 normalized[key] = _json_like_value(item, f"{label}.{key}")
389 return normalized
392def _json_like_value(value: Any, label: str) -> Any:
393 """Validate values that will be serialized into `openclaw.json`."""
394 if isinstance(value, bool):
395 return value
396 if isinstance(value, (str, int, float)):
397 return value
398 if isinstance(value, list):
399 return [_json_like_value(item, f"{label}[{index}]") for index, item in enumerate(value)]
400 if isinstance(value, dict): 400 ↛ 407line 400 didn't jump to line 407 because the condition on line 400 was always true
401 normalized: dict[str, Any] = {}
402 for key, item in value.items():
403 if not isinstance(key, str) or not key.strip(): 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true
404 raise ValidationError(f"{label} keys must be non-empty strings.")
405 normalized[key] = _json_like_value(item, f"{label}.{key}")
406 return normalized
407 raise ValidationError(
408 f"{label} contains an unsupported value type for openclaw.json: {type(value).__name__}"
409 )
412def _require_table(
413 data: dict[str, Any],
414 key: str,
415 *,
416 prefix: str | None = None,
417) -> dict[str, Any]:
418 """Require a nested TOML table and raise a labeled validation error when missing."""
419 value = data.get(key)
420 if not isinstance(value, dict):
421 label = f"{prefix}.{key}" if prefix else key
422 raise ValidationError(f"{label} must be a table.")
423 return value
426def _require_string(
427 data: dict[str, Any],
428 key: str,
429 *,
430 prefix: str | None = None,
431) -> str:
432 """Require a non-empty string field from a parsed TOML table."""
433 label = f"{prefix}.{key}" if prefix else key
434 value = data.get(key)
435 if not isinstance(value, str) or not value.strip():
436 raise ValidationError(f"{label} must be a non-empty string.")
437 return value
440def _optional_string(value: Any, label: str) -> str | None:
441 """Validate an optional string field, returning `None` when it is absent."""
442 if value is None:
443 return None
444 if not isinstance(value, str) or not value.strip():
445 raise ValidationError(f"{label} must be a non-empty string when provided.")
446 return value
449def _require_bool(data: dict[str, Any], key: str, *, prefix: str | None = None) -> bool:
450 """Require a boolean field from a parsed TOML table."""
451 label = f"{prefix}.{key}" if prefix else key
452 value = data.get(key)
453 if not isinstance(value, bool):
454 raise ValidationError(f"{label} must be a boolean.")
455 return value
458def _optional_bool(value: Any, label: str) -> bool:
459 """Validate an optional boolean field that is present in the source payload."""
460 if not isinstance(value, bool):
461 raise ValidationError(f"{label} must be a boolean.")
462 return value
465def _string_list(value: Any, label: str) -> list[str]:
466 """Validate that a manifest field is a list of non-empty strings."""
467 if not isinstance(value, list):
468 raise ValidationError(f"{label} must be a list of strings.")
469 if not all(isinstance(item, str) and item.strip() for item in value):
470 raise ValidationError(f"{label} must contain only non-empty strings.")
471 return list(value)
474def _string_map(value: Any, label: str) -> dict[str, str]:
475 """Validate that a manifest field is a table whose keys and values are strings."""
476 if not isinstance(value, dict):
477 raise ValidationError(f"{label} must be a table of string values.")
478 result: dict[str, str] = {}
479 for key, item in value.items():
480 if not isinstance(key, str) or not key.strip():
481 raise ValidationError(f"{label} keys must be non-empty strings.")
482 if not isinstance(item, str):
483 raise ValidationError(f"{label}.{key} must be a string.")
484 result[key] = item
485 return result