These modules parse openclawenv.toml, normalize the internal model, render
manifests back to TOML, and generate openclawenv.lock.
openenv.manifests¶
openenv.manifests
¶
Manifest parsing, serialization, and locking.
openenv.manifests.loader¶
openenv.manifests.loader
¶
Manifest parsing and validation.
load_manifest(path)
¶
Read and parse a manifest from disk.
Source code in src/openenv/manifests/loader.py
def load_manifest(path: str | Path) -> tuple[Manifest, str]:
"""Read and parse a manifest from disk."""
manifest_path = Path(path)
try:
raw_text = manifest_path.read_text(encoding="utf-8")
except FileNotFoundError as exc:
raise ValidationError(f"Manifest file not found: {manifest_path}") from exc
try:
data = tomllib.loads(raw_text)
except tomllib.TOMLDecodeError as exc:
raise ValidationError(f"Invalid TOML in {manifest_path}: {exc}") from exc
manifest = parse_manifest(data, base_dir=manifest_path.parent)
sidecar_secret_refs = load_secret_refs(manifest_path.parent / ".env")
if sidecar_secret_refs:
if manifest.runtime.secret_refs:
raise ValidationError(
"Declare secret refs either in runtime.secret_refs or in a sibling .env file, not both."
)
manifest.runtime.secret_refs = sidecar_secret_refs
return manifest, raw_text
parse_manifest(data, *, base_dir=None)
¶
Convert parsed TOML into a strongly typed manifest.
Source code in src/openenv/manifests/loader.py
def parse_manifest(
data: dict[str, Any],
*,
base_dir: Path | None = None,
) -> Manifest:
"""Convert parsed TOML into a strongly typed manifest."""
if not isinstance(data, dict):
raise ValidationError("Manifest root must be a TOML table.")
schema_version = data.get("schema_version")
if schema_version != 1:
raise ValidationError("schema_version must be set to 1.")
project_table = _require_table(data, "project")
runtime_table = _require_table(data, "runtime")
agent_table = _require_table(data, "agent")
openclaw_table = _require_table(data, "openclaw")
access_table = data.get("access", {})
if access_table and not isinstance(access_table, dict):
raise ValidationError("access must be a table when provided.")
project = ProjectConfig(
name=_require_string(project_table, "name"),
version=_require_string(project_table, "version"),
description=_require_string(project_table, "description"),
runtime=_require_string(project_table, "runtime"),
)
if project.runtime != "openclaw":
raise ValidationError("project.runtime must currently be 'openclaw'.")
runtime = RuntimeConfig(
base_image=_require_string(runtime_table, "base_image"),
python_version=_require_string(runtime_table, "python_version"),
system_packages=_string_list(
runtime_table.get("system_packages", []),
"runtime.system_packages",
),
python_packages=_string_list(
runtime_table.get("python_packages", []),
"runtime.python_packages",
),
node_packages=_string_list(
runtime_table.get("node_packages", []),
"runtime.node_packages",
),
env=_string_map(runtime_table.get("env", {}), "runtime.env"),
user=_optional_string(runtime_table.get("user"), "runtime.user") or "root",
workdir=_optional_string(runtime_table.get("workdir"), "runtime.workdir")
or "/workspace",
secret_refs=_parse_secret_refs(runtime_table.get("secret_refs", [])),
)
_validate_runtime(runtime)
agent = _parse_agent_config(agent_table, base_dir=base_dir)
skills_raw = data.get("skills", [])
if not isinstance(skills_raw, list):
raise ValidationError("skills must be an array of tables.")
skills = [_parse_skill(item, index) for index, item in enumerate(skills_raw, start=1)]
skills = ensure_mandatory_skills(skills)
_validate_skill_names(skills)
sandbox_table = _require_table(openclaw_table, "sandbox")
tools_table = openclaw_table.get("tools", {})
if not isinstance(tools_table, dict):
raise ValidationError("openclaw.tools must be a table when provided.")
openclaw = OpenClawConfig(
agent_id=_require_string(openclaw_table, "agent_id"),
agent_name=_require_string(openclaw_table, "agent_name"),
workspace=_optional_string(openclaw_table.get("workspace"), "openclaw.workspace")
or "/opt/openclaw/workspace",
state_dir=_optional_string(openclaw_table.get("state_dir"), "openclaw.state_dir")
or "/opt/openclaw",
tools_allow=_string_list(tools_table.get("allow", []), "openclaw.tools.allow"),
tools_deny=_string_list(tools_table.get("deny", []), "openclaw.tools.deny"),
sandbox=SandboxConfig(
mode=_require_string(sandbox_table, "mode"),
scope=_require_string(sandbox_table, "scope"),
workspace_access=_require_string(sandbox_table, "workspace_access"),
network=_require_string(sandbox_table, "network"),
read_only_root=_require_bool(sandbox_table, "read_only_root"),
),
channels=_json_like_mapping(openclaw_table.get("channels", {}), "openclaw.channels"),
)
_validate_openclaw(openclaw)
access = AccessConfig(
websites=_string_list(access_table.get("websites", []), "access.websites"),
databases=_string_list(access_table.get("databases", []), "access.databases"),
notes=_string_list(access_table.get("notes", []), "access.notes"),
)
return Manifest(
schema_version=schema_version,
project=project,
runtime=runtime,
agent=agent,
skills=skills,
openclaw=openclaw,
access=access,
)
openenv.manifests.lockfile¶
openenv.manifests.lockfile
¶
Lockfile creation and serialization.
build_lockfile(manifest, raw_manifest_text, *, resolver=None)
¶
Create a deterministic lockfile from a manifest.
Source code in src/openenv/manifests/lockfile.py
def build_lockfile(
manifest: Manifest,
raw_manifest_text: str,
*,
resolver: Callable[[str], dict[str, str]] | None = None,
) -> Lockfile:
"""Create a deterministic lockfile from a manifest."""
manifest_hash = sha256_text(stable_json_dumps(manifest.to_dict(), indent=None))
image_info = resolve_base_image(manifest.runtime.base_image, resolver=resolver)
python_packages = [
_resolve_python_requirement(requirement)
for requirement in manifest.runtime.python_packages
]
node_packages = [
_resolve_node_requirement(requirement)
for requirement in manifest.runtime.node_packages
]
return Lockfile(
lock_version=1,
manifest_hash=manifest_hash,
base_image={
"digest": image_info["digest"],
"reference": manifest.runtime.base_image,
"resolved_reference": image_info["resolved_reference"],
},
python_packages=python_packages,
node_packages=node_packages,
system_packages=list(manifest.runtime.system_packages),
source_snapshot={
**manifest.source_snapshot(),
"raw_manifest_sha256": sha256_text(raw_manifest_text),
},
)
dump_lockfile(lockfile)
¶
Serialize a lockfile deterministically.
Source code in src/openenv/manifests/lockfile.py
def dump_lockfile(lockfile: Lockfile) -> str:
"""Serialize a lockfile deterministically."""
return stable_json_dumps(lockfile.to_dict(), indent=2) + "\n"
load_lockfile(path)
¶
Load a lockfile from JSON.
Source code in src/openenv/manifests/lockfile.py
def load_lockfile(path: str | Path) -> Lockfile:
"""Load a lockfile from JSON."""
lock_path = Path(path)
try:
data = json.loads(lock_path.read_text(encoding="utf-8"))
except FileNotFoundError as exc:
raise ValidationError(f"Lockfile not found: {lock_path}") from exc
except json.JSONDecodeError as exc:
raise ValidationError(f"Invalid JSON in {lock_path}: {exc}") from exc
return parse_lockfile(data)
parse_lockfile(data)
¶
Validate a parsed lockfile payload.
Source code in src/openenv/manifests/lockfile.py
def parse_lockfile(data: dict[str, Any]) -> Lockfile:
"""Validate a parsed lockfile payload."""
if not isinstance(data, dict):
raise ValidationError("Lockfile root must be an object.")
required_keys = {
"lock_version",
"manifest_hash",
"base_image",
"python_packages",
"node_packages",
"system_packages",
"source_snapshot",
}
missing = sorted(required_keys - set(data))
if missing:
raise ValidationError(f"Lockfile is missing required keys: {', '.join(missing)}")
if data["lock_version"] != 1:
raise ValidationError("lock_version must be set to 1.")
if not isinstance(data["base_image"], dict):
raise ValidationError("lockfile.base_image must be an object.")
if not isinstance(data["python_packages"], list):
raise ValidationError("lockfile.python_packages must be a list.")
if not isinstance(data["node_packages"], list):
raise ValidationError("lockfile.node_packages must be a list.")
if not isinstance(data["system_packages"], list):
raise ValidationError("lockfile.system_packages must be a list.")
if not isinstance(data["source_snapshot"], dict):
raise ValidationError("lockfile.source_snapshot must be an object.")
return Lockfile(
lock_version=data["lock_version"],
manifest_hash=_require_string(data, "manifest_hash"),
base_image=data["base_image"],
python_packages=data["python_packages"],
node_packages=data["node_packages"],
system_packages=data["system_packages"],
source_snapshot=data["source_snapshot"],
)
resolve_base_image(base_image, *, resolver=None)
¶
Resolve the Docker base image to a content-addressed reference.
Source code in src/openenv/manifests/lockfile.py
def resolve_base_image(
base_image: str,
*,
resolver: Callable[[str], dict[str, str]] | None = None,
) -> dict[str, str]:
"""Resolve the Docker base image to a content-addressed reference."""
if resolver is not None:
return resolver(base_image)
digest_match = _DIGEST_PATTERN.match(base_image)
if digest_match:
return {
"digest": digest_match.group("digest"),
"resolved_reference": base_image,
}
try:
completed = _inspect_base_image(base_image)
except OSError as exc:
raise LockResolutionError(
"Docker is required to resolve an unpinned base image. "
"Pin runtime.base_image with @sha256 or make docker available."
) from exc
except subprocess.CalledProcessError as exc:
if _is_missing_local_image_error(exc):
completed = _pull_and_inspect_base_image(base_image)
else:
raise LockResolutionError(
"Unable to resolve runtime.base_image. "
"Pin it with @sha256 or ensure the image is present locally. "
f"Docker said: {_docker_error_detail(exc)}"
) from exc
try:
repo_digests = json.loads(completed.stdout.strip() or "[]")
except json.JSONDecodeError as exc:
raise LockResolutionError(
"Docker returned an unreadable RepoDigests payload while resolving the base image."
) from exc
if not repo_digests:
raise LockResolutionError(
"Docker did not return a RepoDigest for the base image. "
"Pin runtime.base_image with @sha256 for deterministic locks."
)
resolved_reference = repo_digests[0]
digest_match = _DIGEST_PATTERN.match(resolved_reference)
if digest_match is None:
raise LockResolutionError(
f"Resolved base image did not include a digest: {resolved_reference}"
)
digest = digest_match.group("digest")
return {
"digest": digest,
"resolved_reference": _attach_digest(base_image, digest),
}
write_lockfile(path, lockfile)
¶
Write a lockfile to disk.
Source code in src/openenv/manifests/lockfile.py
def write_lockfile(path: str | Path, lockfile: Lockfile) -> None:
"""Write a lockfile to disk."""
Path(path).write_text(dump_lockfile(lockfile), encoding="utf-8")
openenv.manifests.writer¶
openenv.manifests.writer
¶
Manifest serialization helpers.
render_manifest(manifest)
¶
Serialize a manifest dataclass into TOML.
Source code in src/openenv/manifests/writer.py
def render_manifest(manifest: Manifest) -> str:
"""Serialize a manifest dataclass into TOML."""
lines: list[str] = [f"schema_version = {manifest.schema_version}", ""]
lines.extend(["[project]"])
lines.extend(_render_kv("name", manifest.project.name))
lines.extend(_render_kv("version", manifest.project.version))
lines.extend(_render_kv("description", manifest.project.description))
lines.extend(_render_kv("runtime", manifest.project.runtime))
lines.append("")
lines.extend(["[runtime]"])
lines.extend(_render_kv("base_image", manifest.runtime.base_image))
lines.extend(_render_kv("python_version", manifest.runtime.python_version))
lines.extend(_render_kv("system_packages", manifest.runtime.system_packages))
lines.extend(_render_kv("python_packages", manifest.runtime.python_packages))
lines.extend(_render_kv("node_packages", manifest.runtime.node_packages))
if manifest.runtime.env:
lines.append(f"env = {_render_inline_table(manifest.runtime.env)}")
lines.extend(_render_kv("user", manifest.runtime.user))
lines.extend(_render_kv("workdir", manifest.runtime.workdir))
lines.append("")
for secret in manifest.runtime.secret_refs:
lines.extend(["[[runtime.secret_refs]]"])
lines.extend(_render_kv("name", secret.name))
lines.extend(_render_kv("source", secret.source))
lines.extend(_render_kv("required", secret.required))
lines.append("")
lines.extend(["[agent]"])
lines.extend(
_render_agent_doc(
"agents_md",
manifest.agent.agents_md_ref,
manifest.agent.agents_md,
)
)
lines.extend(
_render_agent_doc(
"soul_md",
manifest.agent.soul_md_ref,
manifest.agent.soul_md,
)
)
lines.extend(
_render_agent_doc(
"user_md",
manifest.agent.user_md_ref,
manifest.agent.user_md,
)
)
if manifest.agent.identity_md is not None:
lines.extend(
_render_agent_doc(
"identity_md",
manifest.agent.identity_md_ref,
manifest.agent.identity_md,
)
)
if manifest.agent.tools_md is not None:
lines.extend(
_render_agent_doc(
"tools_md",
manifest.agent.tools_md_ref,
manifest.agent.tools_md,
)
)
if manifest.agent.memory_seed_ref is not None:
lines.extend(_render_kv("memory_seed", manifest.agent.memory_seed_ref))
else:
lines.extend(_render_kv("memory_seed", manifest.agent.memory_seed))
lines.append("")
for skill in manifest.skills:
lines.extend(["[[skills]]"])
lines.extend(_render_kv("name", skill.name))
lines.extend(_render_kv("description", skill.description))
if skill.source is not None:
lines.extend(_render_kv("source", skill.source))
if skill.content is not None:
lines.extend(_render_kv("content", skill.content))
if skill.assets:
lines.append(f"assets = {_render_inline_table(skill.assets)}")
lines.append("")
if manifest.access.websites or manifest.access.databases or manifest.access.notes:
lines.extend(["[access]"])
lines.extend(_render_kv("websites", manifest.access.websites))
lines.extend(_render_kv("databases", manifest.access.databases))
lines.extend(_render_kv("notes", manifest.access.notes))
lines.append("")
lines.extend(["[openclaw]"])
lines.extend(_render_kv("agent_id", manifest.openclaw.agent_id))
lines.extend(_render_kv("agent_name", manifest.openclaw.agent_name))
lines.extend(_render_kv("workspace", manifest.openclaw.workspace))
lines.extend(_render_kv("state_dir", manifest.openclaw.state_dir))
lines.append("")
lines.extend(["[openclaw.sandbox]"])
lines.extend(_render_kv("mode", manifest.openclaw.sandbox.mode))
lines.extend(_render_kv("scope", manifest.openclaw.sandbox.scope))
lines.extend(_render_kv("workspace_access", manifest.openclaw.sandbox.workspace_access))
lines.extend(_render_kv("network", manifest.openclaw.sandbox.network))
lines.extend(_render_kv("read_only_root", manifest.openclaw.sandbox.read_only_root))
lines.append("")
lines.extend(["[openclaw.tools]"])
lines.extend(_render_kv("allow", manifest.openclaw.tools_allow))
lines.extend(_render_kv("deny", manifest.openclaw.tools_deny))
lines.append("")
if manifest.openclaw.channels:
lines.extend(_render_table("openclaw.channels", manifest.openclaw.channels))
lines.append("")
return "\n".join(lines)