Coverage for src / openenv / cli.py: 93.51%

216 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-25 13:36 +0000

1"""Command line interface for OpenClawenv.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import sys 

7from pathlib import Path 

8 

9from loguru import logger 

10 

11from openenv.bots.manager import interactive_menu 

12from openenv.core.errors import CommandError, OpenEnvError 

13from openenv.core.models import Lockfile, Manifest 

14from openenv.core.security import assess_manifest_security, assess_runtime_env_security 

15from openenv.docker.builder import build_image_with_args, default_image_tag 

16from openenv.docker.compose import ( 

17 default_compose_filename, 

18 default_env_filename, 

19 materialize_runtime_mount_tree, 

20 render_compose, 

21 render_env_file, 

22 write_compose, 

23 write_env_file, 

24) 

25from openenv.docker.dockerfile import ( 

26 DEFAULT_SKILL_SCAN_FAIL_ON_SEVERITY, 

27 DEFAULT_SKILL_SCAN_FORMAT, 

28 DEFAULT_SKILL_SCAN_POLICY, 

29 render_dockerfile, 

30) 

31from openenv.envfiles.secret_env import load_secret_values, secret_env_path 

32from openenv.integrations.scanner import run_skill_scanner 

33from openenv.manifests.loader import load_manifest 

34from openenv.manifests.lockfile import ( 

35 build_lockfile, 

36 dump_lockfile, 

37 load_lockfile, 

38 write_lockfile, 

39) 

40from openenv.templates.sample import SAMPLE_MANIFEST 

41 

42 

43DEFAULT_MANIFEST_FILENAME = "openclawenv.toml" 

44LEGACY_MANIFEST_FILENAME = "openenv.toml" 

45DEFAULT_LOCKFILE_FILENAME = "openclawenv.lock" 

46LEGACY_LOCKFILE_FILENAME = "openenv.lock" 

47DEFAULT_SCAN_ARTIFACTS_DIRNAME = ".openclawenv-scan" 

48 

49 

50def build_parser() -> argparse.ArgumentParser: 

51 """Create the CLI parser.""" 

52 parser = argparse.ArgumentParser(prog="clawopenenv", description="OpenClawenv CLI") 

53 subparsers = parser.add_subparsers(dest="command", required=True) 

54 

55 init_parser = subparsers.add_parser("init", help=f"Create a starter {DEFAULT_MANIFEST_FILENAME}") 

56 init_parser.add_argument("--path", default=DEFAULT_MANIFEST_FILENAME, help="Manifest output path") 

57 init_parser.add_argument( 

58 "--force", 

59 action="store_true", 

60 help="Overwrite an existing manifest file", 

61 ) 

62 

63 validate_parser = subparsers.add_parser("validate", help=f"Validate {DEFAULT_MANIFEST_FILENAME}") 

64 validate_parser.add_argument("--path", default=DEFAULT_MANIFEST_FILENAME, help="Manifest path") 

65 

66 lock_parser = subparsers.add_parser("lock", help=f"Generate {DEFAULT_LOCKFILE_FILENAME}") 

67 lock_parser.add_argument("--path", default=DEFAULT_MANIFEST_FILENAME, help="Manifest path") 

68 lock_parser.add_argument("--output", default=DEFAULT_LOCKFILE_FILENAME, help="Lockfile output path") 

69 

70 scan_parser = subparsers.add_parser("scan", help="Run skill-scanner against inline skills") 

71 scan_parser.add_argument("--path", default=DEFAULT_MANIFEST_FILENAME, help="Manifest path") 

72 scan_parser.add_argument( 

73 "--scanner-bin", 

74 default="skill-scanner", 

75 help="Path to the skill-scanner executable", 

76 ) 

77 scan_parser.add_argument( 

78 "--keep-artifacts", 

79 action="store_true", 

80 help=f"Keep the materialized skill directory in {DEFAULT_SCAN_ARTIFACTS_DIRNAME}", 

81 ) 

82 scan_parser.add_argument( 

83 "scanner_args", 

84 nargs=argparse.REMAINDER, 

85 help="Additional arguments passed to skill-scanner after --", 

86 ) 

87 

88 export_parser = subparsers.add_parser("export", help="Export generated artifacts") 

89 export_subparsers = export_parser.add_subparsers(dest="export_command", required=True) 

90 dockerfile_parser = export_subparsers.add_parser( 

91 "dockerfile", 

92 help="Render the deterministic Dockerfile", 

93 ) 

94 dockerfile_parser.add_argument("--path", default=DEFAULT_MANIFEST_FILENAME, help="Manifest path") 

95 dockerfile_parser.add_argument("--lock", default=DEFAULT_LOCKFILE_FILENAME, help="Lockfile path") 

96 dockerfile_parser.add_argument("--output", help="Optional Dockerfile output path") 

97 compose_parser = export_subparsers.add_parser( 

98 "compose", 

99 help="Render the docker-compose file for the bot image", 

100 ) 

101 compose_parser.add_argument("--path", default=DEFAULT_MANIFEST_FILENAME, help="Manifest path") 

102 compose_parser.add_argument("--lock", default=DEFAULT_LOCKFILE_FILENAME, help="Lockfile path") 

103 compose_parser.add_argument("--tag", help="Docker image tag to reference") 

104 compose_parser.add_argument("--output", help="Optional compose output path") 

105 

106 build_parser_cmd = subparsers.add_parser("build", help="Build the Docker image") 

107 build_parser_cmd.add_argument("--path", default=DEFAULT_MANIFEST_FILENAME, help="Manifest path") 

108 build_parser_cmd.add_argument("--lock", default=DEFAULT_LOCKFILE_FILENAME, help="Lockfile path") 

109 build_parser_cmd.add_argument("--tag", help="Docker image tag") 

110 build_parser_cmd.add_argument( 

111 "--scan-format", 

112 default=DEFAULT_SKILL_SCAN_FORMAT, 

113 help="Build-time skill scan format passed to the Dockerfile", 

114 ) 

115 build_parser_cmd.add_argument( 

116 "--scan-policy", 

117 default=DEFAULT_SKILL_SCAN_POLICY, 

118 help="Build-time skill scan policy passed to the Dockerfile", 

119 ) 

120 build_parser_cmd.add_argument( 

121 "--scan-fail-on-severity", 

122 default=DEFAULT_SKILL_SCAN_FAIL_ON_SEVERITY, 

123 help="Build-time skill scan severity threshold passed to the Dockerfile", 

124 ) 

125 

126 return parser 

127 

128 

129def main(argv: list[str] | None = None) -> int: 

130 """Program entry point.""" 

131 _configure_logging() 

132 argv = sys.argv[1:] if argv is None else argv 

133 if not argv: 

134 return interactive_menu(Path.cwd()) 

135 parser = build_parser() 

136 args = parser.parse_args(argv) 

137 try: 

138 if args.command == "init": 

139 return _handle_init(args) 

140 if args.command == "validate": 

141 return _handle_validate(args) 

142 if args.command == "lock": 

143 return _handle_lock(args) 

144 if args.command == "scan": 

145 return _handle_scan(args) 

146 if args.command == "export" and args.export_command == "dockerfile": 

147 return _handle_export_dockerfile(args) 

148 if args.command == "export" and args.export_command == "compose": 

149 return _handle_export_compose(args) 

150 if args.command == "build": 150 ↛ 152line 150 didn't jump to line 152 because the condition on line 150 was always true

151 return _handle_build(args) 

152 parser.error("unknown command") 

153 except CommandError as exc: 

154 logger.error("error: {}", exc) 

155 return exc.exit_code or 1 

156 except OpenEnvError as exc: 

157 logger.error("error: {}", exc) 

158 return 1 

159 return 0 

160 

161 

162def _handle_init(args: argparse.Namespace) -> int: 

163 """Create a starter manifest file at the requested path.""" 

164 path = Path(args.path) 

165 if path.exists() and not args.force: 

166 raise OpenEnvError(f"Refusing to overwrite existing file: {path}") 

167 path.write_text(SAMPLE_MANIFEST, encoding="utf-8") 

168 logger.info("Created starter manifest at {}", path) 

169 return 0 

170 

171 

172def _handle_validate(args: argparse.Namespace) -> int: 

173 """Validate a manifest and print a short summary for the operator.""" 

174 manifest, _ = load_manifest(_resolve_manifest_path_argument(args.path)) 

175 _log_manifest_security_advisories(manifest) 

176 logger.info( 

177 "Manifest valid: " 

178 f"{manifest.project.name} {manifest.project.version} " 

179 f"with {len(manifest.runtime.python_packages)} Python packages " 

180 f"and {len(manifest.skills)} skill(s)." 

181 ) 

182 return 0 

183 

184 

185def _handle_lock(args: argparse.Namespace) -> int: 

186 """Resolve the manifest into a deterministic lockfile and write it to disk.""" 

187 manifest_path = _resolve_manifest_path_argument(args.path) 

188 manifest, raw_manifest_text = load_manifest(manifest_path) 

189 _log_manifest_security_advisories(manifest) 

190 lockfile = build_lockfile(manifest, raw_manifest_text) 

191 write_lockfile(args.output, lockfile) 

192 logger.info("Wrote lockfile to {}", args.output) 

193 return 0 

194 

195 

196def _handle_scan(args: argparse.Namespace) -> int: 

197 """Materialize skills from the manifest and run the external skill scanner.""" 

198 manifest_path = _resolve_manifest_path_argument(args.path) 

199 manifest, _ = load_manifest(manifest_path) 

200 _log_manifest_security_advisories(manifest) 

201 scan_dir = run_skill_scanner( 

202 manifest_path, 

203 manifest, 

204 scanner_bin=args.scanner_bin, 

205 scanner_args=args.scanner_args, 

206 keep_artifacts=args.keep_artifacts, 

207 ) 

208 if scan_dir is not None: 208 ↛ 210line 208 didn't jump to line 210 because the condition on line 208 was always true

209 logger.info("Kept materialized scan artifacts in {}", scan_dir) 

210 logger.info("skill-scanner completed successfully") 

211 return 0 

212 

213 

214def _handle_export_dockerfile(args: argparse.Namespace) -> int: 

215 """Render the locked Dockerfile and either print it or save it to disk.""" 

216 if args.output: 

217 manifest, _ = load_manifest(_resolve_manifest_path_argument(args.path)) 

218 _log_manifest_security_advisories(manifest) 

219 dockerfile_text = _render_locked_dockerfile( 

220 manifest_path=args.path, 

221 lock_path=args.lock, 

222 ) 

223 if args.output: 

224 Path(args.output).write_text(dockerfile_text, encoding="utf-8") 

225 logger.info("Wrote Dockerfile to {}", args.output) 

226 else: 

227 sys.stdout.write(dockerfile_text) 

228 return 0 

229 

230 

231def _handle_build(args: argparse.Namespace) -> int: 

232 """Build the Docker image and emit the compose bundle beside the manifest.""" 

233 manifest_path = _resolve_manifest_path_argument(args.path) 

234 manifest, lockfile, raw_manifest_text = _load_and_verify_lockfile( 

235 manifest_path, 

236 _resolve_lock_path_argument(args.lock, manifest_path=manifest_path), 

237 ) 

238 _log_manifest_security_advisories(manifest) 

239 raw_lock_text = dump_lockfile(lockfile) 

240 dockerfile_text = render_dockerfile( 

241 manifest, 

242 lockfile, 

243 raw_manifest_text=raw_manifest_text, 

244 raw_lock_text=raw_lock_text, 

245 ) 

246 tag = args.tag or default_image_tag(manifest.project.name, manifest.project.version) 

247 build_image_with_args( 

248 dockerfile_text, 

249 tag, 

250 build_args={ 

251 "OPENCLAWENV_SKILL_SCAN_FORMAT": args.scan_format, 

252 "OPENCLAWENV_SKILL_SCAN_POLICY": args.scan_policy, 

253 "OPENCLAWENV_SKILL_SCAN_FAIL_ON_SEVERITY": args.scan_fail_on_severity, 

254 }, 

255 ) 

256 dockerfile_path, compose_path, env_path = _write_compose_bundle( 

257 manifest_path=manifest_path, 

258 manifest=manifest, 

259 lockfile=lockfile, 

260 image_tag=tag, 

261 raw_manifest_text=raw_manifest_text, 

262 raw_lock_text=raw_lock_text, 

263 dockerfile_text=dockerfile_text, 

264 ) 

265 logger.info("Wrote Dockerfile to {}", dockerfile_path) 

266 logger.info("Wrote docker-compose file to {}", compose_path) 

267 logger.info("Wrote secrets env file to {}", env_path) 

268 logger.info("Built image {}", tag) 

269 return 0 

270 

271 

272def _handle_export_compose(args: argparse.Namespace) -> int: 

273 """Render the compose/env bundle that references a previously locked image build.""" 

274 manifest_path = _resolve_manifest_path_argument(args.path) 

275 lock_path = _resolve_lock_path_argument(args.lock, manifest_path=manifest_path) 

276 manifest, _ = load_manifest(manifest_path) 

277 _log_manifest_security_advisories(manifest) 

278 manifest, lockfile, raw_manifest_text = _load_and_verify_lockfile(manifest_path, lock_path) 

279 tag = args.tag or default_image_tag(manifest.project.name, manifest.project.version) 

280 compose_path = Path(args.output) if args.output else None 

281 raw_lock_text = dump_lockfile(lockfile) 

282 dockerfile_text = render_dockerfile( 

283 manifest, 

284 lockfile, 

285 raw_manifest_text=raw_manifest_text, 

286 raw_lock_text=raw_lock_text, 

287 ) 

288 dockerfile_path, compose_path, env_path = _write_compose_bundle( 

289 manifest_path=manifest_path, 

290 manifest=manifest, 

291 lockfile=lockfile, 

292 image_tag=tag, 

293 compose_path=compose_path, 

294 raw_manifest_text=raw_manifest_text, 

295 raw_lock_text=raw_lock_text, 

296 dockerfile_text=dockerfile_text, 

297 ) 

298 logger.info("Wrote Dockerfile to {}", dockerfile_path) 

299 logger.info("Wrote docker-compose file to {}", compose_path) 

300 logger.info("Wrote secrets env file to {}", env_path) 

301 return 0 

302 

303 

304def _configure_logging() -> None: 

305 """Configure plain CLI logging via loguru.""" 

306 logger.remove() 

307 logger.add( 

308 sys.stdout, 

309 level="INFO", 

310 format="{message}", 

311 filter=lambda record: record["level"].no < 40, 

312 ) 

313 logger.add( 

314 sys.stderr, 

315 level="ERROR", 

316 format="{message}", 

317 ) 

318 

319 

320def _render_locked_dockerfile(*, manifest_path: str, lock_path: str) -> str: 

321 """Load the manifest and verified lockfile, then render the effective Dockerfile.""" 

322 manifest, lockfile, raw_manifest_text = _load_and_verify_lockfile( 

323 _resolve_manifest_path_argument(manifest_path), 

324 _resolve_lock_path_argument(lock_path, manifest_path=manifest_path), 

325 ) 

326 raw_lock_text = dump_lockfile(lockfile) 

327 return render_dockerfile( 

328 manifest, 

329 lockfile, 

330 raw_manifest_text=raw_manifest_text, 

331 raw_lock_text=raw_lock_text, 

332 ) 

333 

334 

335def _load_and_verify_lockfile( 

336 manifest_path: str, 

337 lock_path: str, 

338) -> tuple[Manifest, Lockfile, str]: 

339 """Load a manifest/lockfile pair and ensure the lock matches the current manifest.""" 

340 resolved_manifest_path = _resolve_manifest_path_argument(manifest_path) 

341 manifest, raw_manifest_text = load_manifest(resolved_manifest_path) 

342 lockfile = load_lockfile(_resolve_lock_path_argument(lock_path, manifest_path=resolved_manifest_path)) 

343 expected_hash = build_lockfile( 

344 manifest, 

345 raw_manifest_text, 

346 resolver=lambda _: { 

347 "digest": lockfile.base_image["digest"], 

348 "resolved_reference": lockfile.base_image["resolved_reference"], 

349 }, 

350 ).manifest_hash 

351 if expected_hash != lockfile.manifest_hash: 

352 raise OpenEnvError( 

353 "The lockfile does not match the current manifest. Run `clawopenenv lock` again." 

354 ) 

355 return manifest, lockfile, raw_manifest_text 

356 

357 

358def _default_compose_path(manifest_path: str, agent_name: str) -> Path: 

359 """Return the default compose destination located next to the manifest.""" 

360 return Path(manifest_path).resolve().parent / default_compose_filename(agent_name) 

361 

362 

363def _resolve_manifest_path_argument(path: str | Path) -> str: 

364 """Resolve the manifest path, falling back to the legacy filename when present.""" 

365 candidate = Path(path) 

366 if candidate.name == DEFAULT_MANIFEST_FILENAME and not candidate.exists(): 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true

367 legacy = candidate.with_name(LEGACY_MANIFEST_FILENAME) 

368 if legacy.exists(): 

369 return str(legacy) 

370 return str(candidate) 

371 

372 

373def _resolve_lock_path_argument(lock_path: str | Path, *, manifest_path: str | Path | None = None) -> str: 

374 """Resolve the lockfile path, falling back to legacy or sibling lockfile names.""" 

375 candidate = Path(lock_path) 

376 search_paths: list[Path] = [] 

377 if candidate.name == DEFAULT_LOCKFILE_FILENAME: 377 ↛ 381line 377 didn't jump to line 381 because the condition on line 377 was always true

378 search_paths.append(candidate) 

379 search_paths.append(candidate.with_name(LEGACY_LOCKFILE_FILENAME)) 

380 else: 

381 search_paths.append(candidate) 

382 if manifest_path is not None: 382 ↛ 390line 382 didn't jump to line 390 because the condition on line 382 was always true

383 manifest_candidate = Path(manifest_path) 

384 search_paths.extend( 

385 [ 

386 manifest_candidate.with_name(DEFAULT_LOCKFILE_FILENAME), 

387 manifest_candidate.with_name(LEGACY_LOCKFILE_FILENAME), 

388 ] 

389 ) 

390 for path in search_paths: 390 ↛ 393line 390 didn't jump to line 393 because the loop on line 390 didn't complete

391 if path.exists(): 391 ↛ 390line 391 didn't jump to line 390 because the condition on line 391 was always true

392 return str(path) 

393 return str(candidate) 

394 

395 

396def _write_compose_bundle( 

397 *, 

398 manifest_path: str, 

399 manifest: Manifest, 

400 lockfile: Lockfile, 

401 image_tag: str, 

402 raw_manifest_text: str, 

403 raw_lock_text: str, 

404 compose_path: Path | None = None, 

405 dockerfile_text: str | None = None, 

406) -> tuple[Path | None, Path, Path]: 

407 """Write the Dockerfile, compose file, and env file associated with one bot build.""" 

408 compose_target = compose_path or _default_compose_path( 

409 manifest_path, 

410 manifest.openclaw.agent_name, 

411 ) 

412 dockerfile_target: Path | None = None 

413 if dockerfile_text is not None: 413 ↛ 416line 413 didn't jump to line 416 because the condition on line 413 was always true

414 dockerfile_target = compose_target.resolve().parent / "Dockerfile" 

415 dockerfile_target.write_text(dockerfile_text, encoding="utf-8") 

416 env_target = compose_target.resolve().parent / default_env_filename( 

417 manifest.openclaw.agent_name 

418 ) 

419 write_compose(compose_target, render_compose(manifest, image_tag)) 

420 source_env_path = secret_env_path(Path(manifest_path).resolve().parent) 

421 existing_values = load_secret_values(env_target) 

422 if source_env_path.exists(): 

423 existing_values.update(load_secret_values(source_env_path)) 

424 _log_runtime_env_advisories(existing_values) 

425 write_env_file( 

426 env_target, 

427 render_env_file(manifest, image_tag, existing_values=existing_values), 

428 ) 

429 materialize_runtime_mount_tree( 

430 compose_target.resolve().parent, 

431 manifest, 

432 lockfile, 

433 raw_manifest_text=raw_manifest_text, 

434 raw_lock_text=raw_lock_text, 

435 ) 

436 return dockerfile_target, compose_target, env_target 

437 

438 

439def _log_manifest_security_advisories(manifest: Manifest) -> None: 

440 """Emit non-blocking security warnings for risky manifest choices.""" 

441 for advisory in assess_manifest_security(manifest): 

442 logger.warning("security warning: {}", advisory) 

443 

444 

445def _log_runtime_env_advisories(values: dict[str, str]) -> None: 

446 """Emit non-blocking security warnings for risky runtime env overrides.""" 

447 for advisory in assess_runtime_env_security(values): 

448 logger.warning("security warning: {}", advisory)