Coverage for src / openenv / manifests / loader.py: 95.92%

229 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-25 13:36 +0000

1"""Manifest parsing and validation.""" 

2 

3from __future__ import annotations 

4 

5import re 

6import tomllib 

7from pathlib import Path, PurePosixPath 

8from typing import Any 

9 

10from openenv.core.errors import ValidationError 

11from openenv.core.models import ( 

12 AccessConfig, 

13 AgentConfig, 

14 Manifest, 

15 OpenClawConfig, 

16 ProjectConfig, 

17 RuntimeConfig, 

18 SandboxConfig, 

19 SecretRef, 

20 SkillConfig, 

21) 

22from openenv.core.skills import ensure_mandatory_skills 

23from openenv.envfiles.secret_env import load_secret_refs 

24 

25_SENSITIVE_KEY_PATTERN = re.compile( 

26 r"(secret|token|password|api[_-]?key|access[_-]?key)", re.IGNORECASE 

27) 

28 

29 

30def load_manifest(path: str | Path) -> tuple[Manifest, str]: 

31 """Read and parse a manifest from disk.""" 

32 manifest_path = Path(path) 

33 try: 

34 raw_text = manifest_path.read_text(encoding="utf-8") 

35 except FileNotFoundError as exc: 

36 raise ValidationError(f"Manifest file not found: {manifest_path}") from exc 

37 try: 

38 data = tomllib.loads(raw_text) 

39 except tomllib.TOMLDecodeError as exc: 

40 raise ValidationError(f"Invalid TOML in {manifest_path}: {exc}") from exc 

41 manifest = parse_manifest(data, base_dir=manifest_path.parent) 

42 sidecar_secret_refs = load_secret_refs(manifest_path.parent / ".env") 

43 if sidecar_secret_refs: 

44 if manifest.runtime.secret_refs: 

45 raise ValidationError( 

46 "Declare secret refs either in runtime.secret_refs or in a sibling .env file, not both." 

47 ) 

48 manifest.runtime.secret_refs = sidecar_secret_refs 

49 return manifest, raw_text 

50 

51 

52def parse_manifest( 

53 data: dict[str, Any], 

54 *, 

55 base_dir: Path | None = None, 

56) -> Manifest: 

57 """Convert parsed TOML into a strongly typed manifest.""" 

58 if not isinstance(data, dict): 

59 raise ValidationError("Manifest root must be a TOML table.") 

60 

61 schema_version = data.get("schema_version") 

62 if schema_version != 1: 

63 raise ValidationError("schema_version must be set to 1.") 

64 

65 project_table = _require_table(data, "project") 

66 runtime_table = _require_table(data, "runtime") 

67 agent_table = _require_table(data, "agent") 

68 openclaw_table = _require_table(data, "openclaw") 

69 access_table = data.get("access", {}) 

70 if access_table and not isinstance(access_table, dict): 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 raise ValidationError("access must be a table when provided.") 

72 

73 project = ProjectConfig( 

74 name=_require_string(project_table, "name"), 

75 version=_require_string(project_table, "version"), 

76 description=_require_string(project_table, "description"), 

77 runtime=_require_string(project_table, "runtime"), 

78 ) 

79 if project.runtime != "openclaw": 

80 raise ValidationError("project.runtime must currently be 'openclaw'.") 

81 

82 runtime = RuntimeConfig( 

83 base_image=_require_string(runtime_table, "base_image"), 

84 python_version=_require_string(runtime_table, "python_version"), 

85 system_packages=_string_list( 

86 runtime_table.get("system_packages", []), 

87 "runtime.system_packages", 

88 ), 

89 python_packages=_string_list( 

90 runtime_table.get("python_packages", []), 

91 "runtime.python_packages", 

92 ), 

93 node_packages=_string_list( 

94 runtime_table.get("node_packages", []), 

95 "runtime.node_packages", 

96 ), 

97 env=_string_map(runtime_table.get("env", {}), "runtime.env"), 

98 user=_optional_string(runtime_table.get("user"), "runtime.user") or "root", 

99 workdir=_optional_string(runtime_table.get("workdir"), "runtime.workdir") 

100 or "/workspace", 

101 secret_refs=_parse_secret_refs(runtime_table.get("secret_refs", [])), 

102 ) 

103 _validate_runtime(runtime) 

104 

105 agent = _parse_agent_config(agent_table, base_dir=base_dir) 

106 

107 skills_raw = data.get("skills", []) 

108 if not isinstance(skills_raw, list): 

109 raise ValidationError("skills must be an array of tables.") 

110 skills = [_parse_skill(item, index) for index, item in enumerate(skills_raw, start=1)] 

111 skills = ensure_mandatory_skills(skills) 

112 _validate_skill_names(skills) 

113 

114 sandbox_table = _require_table(openclaw_table, "sandbox") 

115 tools_table = openclaw_table.get("tools", {}) 

116 if not isinstance(tools_table, dict): 

117 raise ValidationError("openclaw.tools must be a table when provided.") 

118 

119 openclaw = OpenClawConfig( 

120 agent_id=_require_string(openclaw_table, "agent_id"), 

121 agent_name=_require_string(openclaw_table, "agent_name"), 

122 workspace=_optional_string(openclaw_table.get("workspace"), "openclaw.workspace") 

123 or "/opt/openclaw/workspace", 

124 state_dir=_optional_string(openclaw_table.get("state_dir"), "openclaw.state_dir") 

125 or "/opt/openclaw", 

126 tools_allow=_string_list(tools_table.get("allow", []), "openclaw.tools.allow"), 

127 tools_deny=_string_list(tools_table.get("deny", []), "openclaw.tools.deny"), 

128 sandbox=SandboxConfig( 

129 mode=_require_string(sandbox_table, "mode"), 

130 scope=_require_string(sandbox_table, "scope"), 

131 workspace_access=_require_string(sandbox_table, "workspace_access"), 

132 network=_require_string(sandbox_table, "network"), 

133 read_only_root=_require_bool(sandbox_table, "read_only_root"), 

134 ), 

135 channels=_json_like_mapping(openclaw_table.get("channels", {}), "openclaw.channels"), 

136 ) 

137 _validate_openclaw(openclaw) 

138 

139 access = AccessConfig( 

140 websites=_string_list(access_table.get("websites", []), "access.websites"), 

141 databases=_string_list(access_table.get("databases", []), "access.databases"), 

142 notes=_string_list(access_table.get("notes", []), "access.notes"), 

143 ) 

144 

145 return Manifest( 

146 schema_version=schema_version, 

147 project=project, 

148 runtime=runtime, 

149 agent=agent, 

150 skills=skills, 

151 openclaw=openclaw, 

152 access=access, 

153 ) 

154 

155 

156def _parse_secret_refs(value: Any) -> list[SecretRef]: 

157 """Parse inline `runtime.secret_refs` entries into typed secret references.""" 

158 if not isinstance(value, list): 

159 raise ValidationError("runtime.secret_refs must be an array of tables.") 

160 secret_refs: list[SecretRef] = [] 

161 for index, item in enumerate(value, start=1): 

162 if not isinstance(item, dict): 

163 raise ValidationError(f"runtime.secret_refs[{index}] must be a table.") 

164 secret_refs.append( 

165 SecretRef( 

166 name=_require_string(item, "name", prefix=f"runtime.secret_refs[{index}]"), 

167 source=_require_string(item, "source", prefix=f"runtime.secret_refs[{index}]"), 

168 required=( 

169 _optional_bool( 

170 item.get("required"), 

171 f"runtime.secret_refs[{index}].required", 

172 ) 

173 if "required" in item 

174 else True 

175 ), 

176 ) 

177 ) 

178 return secret_refs 

179 

180 

181def _parse_agent_config( 

182 agent_table: dict[str, Any], 

183 *, 

184 base_dir: Path | None = None, 

185) -> AgentConfig: 

186 """Parse the `[agent]` table, resolving optional markdown file references.""" 

187 agents_md, agents_md_ref = _parse_agent_document( 

188 agent_table, 

189 "agents_md", 

190 base_dir=base_dir, 

191 ) 

192 soul_md, soul_md_ref = _parse_agent_document( 

193 agent_table, 

194 "soul_md", 

195 base_dir=base_dir, 

196 ) 

197 user_md, user_md_ref = _parse_agent_document( 

198 agent_table, 

199 "user_md", 

200 base_dir=base_dir, 

201 ) 

202 identity_md, identity_md_ref = _parse_agent_document( 

203 agent_table, 

204 "identity_md", 

205 base_dir=base_dir, 

206 required=False, 

207 ) 

208 tools_md, tools_md_ref = _parse_agent_document( 

209 agent_table, 

210 "tools_md", 

211 base_dir=base_dir, 

212 required=False, 

213 ) 

214 memory_seed, memory_seed_ref = _parse_memory_seed( 

215 agent_table.get("memory_seed", []), 

216 base_dir=base_dir, 

217 ) 

218 return AgentConfig( 

219 agents_md=agents_md, 

220 soul_md=soul_md, 

221 user_md=user_md, 

222 identity_md=identity_md, 

223 tools_md=tools_md, 

224 memory_seed=memory_seed, 

225 agents_md_ref=agents_md_ref, 

226 soul_md_ref=soul_md_ref, 

227 user_md_ref=user_md_ref, 

228 identity_md_ref=identity_md_ref, 

229 tools_md_ref=tools_md_ref, 

230 memory_seed_ref=memory_seed_ref, 

231 ) 

232 

233 

234def _parse_memory_seed( 

235 value: Any, 

236 *, 

237 base_dir: Path | None = None, 

238) -> tuple[list[str], str | None]: 

239 """Parse `agent.memory_seed` from inline text, a list, or a referenced markdown file.""" 

240 if isinstance(value, str): 

241 if base_dir is not None and _looks_like_markdown_ref(value): 

242 memory_ref = _validate_markdown_ref(value, "agent.memory_seed") 

243 content = _read_markdown_ref(base_dir, memory_ref, "agent.memory_seed") 

244 return _split_memory_seed(content), memory_ref 

245 return _split_memory_seed(value), None 

246 return _string_list(value, "agent.memory_seed"), None 

247 

248 

249def _parse_agent_document( 

250 agent_table: dict[str, Any], 

251 key: str, 

252 *, 

253 base_dir: Path | None = None, 

254 required: bool = True, 

255) -> tuple[str | None, str | None]: 

256 """Parse one agent markdown field and optionally dereference a sibling `.md` file.""" 

257 label = f"agent.{key}" 

258 if key not in agent_table: 

259 if required: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true

260 raise ValidationError(f"{label} must be a non-empty string.") 

261 return None, None 

262 

263 value = _optional_string(agent_table.get(key), label) 

264 if value is None: 

265 return None, None 

266 if base_dir is not None and _looks_like_markdown_ref(value): 

267 ref = _validate_markdown_ref(value, label) 

268 return _read_markdown_ref(base_dir, ref, label), ref 

269 return value, None 

270 

271 

272def _split_memory_seed(value: str) -> list[str]: 

273 """Normalize multiline memory seed text into a list of non-empty logical lines.""" 

274 return [line.rstrip() for line in value.splitlines() if line.strip()] 

275 

276 

277def _looks_like_markdown_ref(value: str) -> bool: 

278 """Return whether a manifest string should be interpreted as a markdown file path.""" 

279 return "\n" not in value and value.strip().lower().endswith(".md") 

280 

281 

282def _validate_markdown_ref(value: str, label: str) -> str: 

283 """Ensure a markdown file reference stays inside the manifest directory.""" 

284 normalized = PurePosixPath(value.replace("\\", "/")) 

285 if Path(value).is_absolute() or normalized.is_absolute() or ".." in normalized.parts: 

286 raise ValidationError( 

287 f"{label} reference must stay within the manifest directory: {value}" 

288 ) 

289 return value 

290 

291 

292def _read_markdown_ref(base_dir: Path, relative_path: str, label: str) -> str: 

293 """Read and validate a referenced markdown file from disk.""" 

294 file_path = base_dir / Path(relative_path) 

295 try: 

296 content = file_path.read_text(encoding="utf-8") 

297 except FileNotFoundError as exc: 

298 raise ValidationError( 

299 f"{label} references a missing file: {relative_path}" 

300 ) from exc 

301 if not file_path.is_file(): 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 raise ValidationError(f"{label} reference must point to a file: {relative_path}") 

303 if not content.strip(): 

304 raise ValidationError(f"{label} file cannot be empty: {relative_path}") 

305 return content 

306 

307 

308def _parse_skill(item: Any, index: int) -> SkillConfig: 

309 """Parse one `[[skills]]` entry, including optional inline assets and source refs.""" 

310 if not isinstance(item, dict): 

311 raise ValidationError(f"skills[{index}] must be a table.") 

312 assets = _string_map(item.get("assets", {}), f"skills[{index}].assets") 

313 for asset_path in assets: 

314 normalized = PurePosixPath(asset_path) 

315 if normalized.is_absolute() or ".." in normalized.parts: 

316 raise ValidationError( 

317 f"skills[{index}].assets path must stay within the skill directory: {asset_path}" 

318 ) 

319 skill = SkillConfig( 

320 name=_require_string(item, "name", prefix=f"skills[{index}]"), 

321 description=_require_string(item, "description", prefix=f"skills[{index}]"), 

322 content=_optional_string(item.get("content"), f"skills[{index}].content"), 

323 source=_optional_string(item.get("source"), f"skills[{index}].source"), 

324 assets=assets, 

325 ) 

326 if skill.content is None and skill.source is None: 

327 raise ValidationError( 

328 f"skills[{index}] must define either content or source." 

329 ) 

330 if skill.content is not None and not skill.content.lstrip().startswith("---"): 

331 raise ValidationError( 

332 f"skills[{index}].content must be a full SKILL.md document with frontmatter." 

333 ) 

334 return skill 

335 

336 

337def _validate_runtime(runtime: RuntimeConfig) -> None: 

338 """Validate runtime invariants that cannot be expressed by TOML typing alone.""" 

339 if not runtime.base_image: 

340 raise ValidationError("runtime.base_image cannot be empty.") 

341 if not PurePosixPath(runtime.workdir).is_absolute(): 

342 raise ValidationError("runtime.workdir must be an absolute POSIX path.") 

343 for key, value in runtime.env.items(): 

344 if _SENSITIVE_KEY_PATTERN.search(key): 

345 raise ValidationError( 

346 "Sensitive environment variables must be declared via " 

347 "runtime.secret_refs, not runtime.env." 

348 ) 

349 if not value: 

350 raise ValidationError(f"runtime.env.{key} cannot be empty.") 

351 

352 

353def _validate_openclaw(config: OpenClawConfig) -> None: 

354 """Validate OpenClaw-specific path invariants after parsing defaults.""" 

355 if not PurePosixPath(config.workspace).is_absolute(): 

356 raise ValidationError("openclaw.workspace must be an absolute POSIX path.") 

357 if not PurePosixPath(config.state_dir).is_absolute(): 

358 raise ValidationError("openclaw.state_dir must be an absolute POSIX path.") 

359 allow_set = set(config.tools_allow) 

360 deny_set = set(config.tools_deny) 

361 overlapping = sorted(allow_set & deny_set) 

362 if overlapping: 

363 raise ValidationError( 

364 "openclaw.tools.allow and openclaw.tools.deny cannot overlap: " 

365 + ", ".join(overlapping) 

366 ) 

367 

368 

369def _validate_skill_names(skills: list[SkillConfig]) -> None: 

370 """Reject duplicate skill names so workspace paths remain unique.""" 

371 seen: set[str] = set() 

372 for skill in skills: 

373 if skill.name in seen: 

374 raise ValidationError(f"Duplicate skill name: {skill.name}") 

375 seen.add(skill.name) 

376 

377 

378def _json_like_mapping(value: Any, label: str) -> dict[str, Any]: 

379 """Validate and normalize a TOML table into JSON-compatible nested data.""" 

380 if value in ({}, None): 

381 return {} 

382 if not isinstance(value, dict): 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true

383 raise ValidationError(f"{label} must be a table when provided.") 

384 normalized: dict[str, Any] = {} 

385 for key, item in value.items(): 

386 if not isinstance(key, str) or not key.strip(): 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true

387 raise ValidationError(f"{label} keys must be non-empty strings.") 

388 normalized[key] = _json_like_value(item, f"{label}.{key}") 

389 return normalized 

390 

391 

392def _json_like_value(value: Any, label: str) -> Any: 

393 """Validate values that will be serialized into `openclaw.json`.""" 

394 if isinstance(value, bool): 

395 return value 

396 if isinstance(value, (str, int, float)): 

397 return value 

398 if isinstance(value, list): 

399 return [_json_like_value(item, f"{label}[{index}]") for index, item in enumerate(value)] 

400 if isinstance(value, dict): 400 ↛ 407line 400 didn't jump to line 407 because the condition on line 400 was always true

401 normalized: dict[str, Any] = {} 

402 for key, item in value.items(): 

403 if not isinstance(key, str) or not key.strip(): 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true

404 raise ValidationError(f"{label} keys must be non-empty strings.") 

405 normalized[key] = _json_like_value(item, f"{label}.{key}") 

406 return normalized 

407 raise ValidationError( 

408 f"{label} contains an unsupported value type for openclaw.json: {type(value).__name__}" 

409 ) 

410 

411 

412def _require_table( 

413 data: dict[str, Any], 

414 key: str, 

415 *, 

416 prefix: str | None = None, 

417) -> dict[str, Any]: 

418 """Require a nested TOML table and raise a labeled validation error when missing.""" 

419 value = data.get(key) 

420 if not isinstance(value, dict): 

421 label = f"{prefix}.{key}" if prefix else key 

422 raise ValidationError(f"{label} must be a table.") 

423 return value 

424 

425 

426def _require_string( 

427 data: dict[str, Any], 

428 key: str, 

429 *, 

430 prefix: str | None = None, 

431) -> str: 

432 """Require a non-empty string field from a parsed TOML table.""" 

433 label = f"{prefix}.{key}" if prefix else key 

434 value = data.get(key) 

435 if not isinstance(value, str) or not value.strip(): 

436 raise ValidationError(f"{label} must be a non-empty string.") 

437 return value 

438 

439 

440def _optional_string(value: Any, label: str) -> str | None: 

441 """Validate an optional string field, returning `None` when it is absent.""" 

442 if value is None: 

443 return None 

444 if not isinstance(value, str) or not value.strip(): 

445 raise ValidationError(f"{label} must be a non-empty string when provided.") 

446 return value 

447 

448 

449def _require_bool(data: dict[str, Any], key: str, *, prefix: str | None = None) -> bool: 

450 """Require a boolean field from a parsed TOML table.""" 

451 label = f"{prefix}.{key}" if prefix else key 

452 value = data.get(key) 

453 if not isinstance(value, bool): 

454 raise ValidationError(f"{label} must be a boolean.") 

455 return value 

456 

457 

458def _optional_bool(value: Any, label: str) -> bool: 

459 """Validate an optional boolean field that is present in the source payload.""" 

460 if not isinstance(value, bool): 

461 raise ValidationError(f"{label} must be a boolean.") 

462 return value 

463 

464 

465def _string_list(value: Any, label: str) -> list[str]: 

466 """Validate that a manifest field is a list of non-empty strings.""" 

467 if not isinstance(value, list): 

468 raise ValidationError(f"{label} must be a list of strings.") 

469 if not all(isinstance(item, str) and item.strip() for item in value): 

470 raise ValidationError(f"{label} must contain only non-empty strings.") 

471 return list(value) 

472 

473 

474def _string_map(value: Any, label: str) -> dict[str, str]: 

475 """Validate that a manifest field is a table whose keys and values are strings.""" 

476 if not isinstance(value, dict): 

477 raise ValidationError(f"{label} must be a table of string values.") 

478 result: dict[str, str] = {} 

479 for key, item in value.items(): 

480 if not isinstance(key, str) or not key.strip(): 

481 raise ValidationError(f"{label} keys must be non-empty strings.") 

482 if not isinstance(item, str): 

483 raise ValidationError(f"{label}.{key} must be a string.") 

484 result[key] = item 

485 return result