Coverage for src / openenv / envfiles / project_env.py: 93.26%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-25 13:36 +0000

1"""Helpers for loading and updating the project root .env file.""" 

2 

3from __future__ import annotations 

4 

5import os 

6import re 

7from pathlib import Path 

8 

9from openenv.core.errors import ValidationError 

10 

11 

12PROJECT_ENV_FILENAME = ".env" 

13_ENV_KEY_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") 

14 

15 

16def project_env_path(root: str | Path) -> Path: 

17 """Return the project-level .env path.""" 

18 return Path(root).resolve() / PROJECT_ENV_FILENAME 

19 

20 

21def load_project_env(path: str | Path) -> dict[str, str]: 

22 """Load KEY=VALUE pairs from a project .env file.""" 

23 env_path = Path(path) 

24 if not env_path.exists(): 24 ↛ 26line 24 didn't jump to line 26 because the condition on line 24 was always true

25 return {} 

26 return dict(parse_project_env_text(env_path.read_text(encoding="utf-8"), label=str(env_path))) 

27 

28 

29def get_project_env_value(root: str | Path, key: str) -> str | None: 

30 """Resolve a configuration value from the OS env first, then the project .env.""" 

31 if value := os.environ.get(key): 

32 return value 

33 return load_project_env(project_env_path(root)).get(key) 

34 

35 

36def write_project_env_value(root: str | Path, key: str, value: str) -> Path: 

37 """Upsert a single KEY=VALUE entry in the project root .env file.""" 

38 if not _ENV_KEY_PATTERN.match(key): 

39 raise ValidationError(f"Invalid env var name: {key!r}") 

40 env_path = project_env_path(root) 

41 existing_text = env_path.read_text(encoding="utf-8") if env_path.exists() else "" 

42 rendered = upsert_project_env_text(existing_text, key, value) 

43 env_path.write_text(rendered, encoding="utf-8") 

44 return env_path 

45 

46 

47def parse_project_env_text(text: str, *, label: str) -> list[tuple[str, str]]: 

48 """Parse a project .env file.""" 

49 entries: list[tuple[str, str]] = [] 

50 seen: set[str] = set() 

51 for line_no, raw_line in enumerate(text.splitlines(), start=1): 51 ↛ 67line 51 didn't jump to line 67 because the loop on line 51 didn't complete

52 stripped = raw_line.strip() 

53 if not stripped or stripped.startswith("#"): 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true

54 continue 

55 if "=" not in raw_line: 

56 raise ValidationError(f"{label}:{line_no} must use KEY=VALUE syntax.") 

57 key, value = raw_line.split("=", 1) 

58 key = key.strip() 

59 if not _ENV_KEY_PATTERN.match(key): 

60 raise ValidationError( 

61 f"{label}:{line_no} has an invalid env var name: {key!r}" 

62 ) 

63 if key in seen: 

64 raise ValidationError(f"{label}:{line_no} duplicates env var {key}.") 

65 seen.add(key) 

66 entries.append((key, value)) 

67 return entries 

68 

69 

70def upsert_project_env_text(text: str, key: str, value: str) -> str: 

71 """Insert or replace a KEY=VALUE line while preserving other content.""" 

72 lines = text.splitlines() 

73 rendered_line = f"{key}={value}" 

74 updated = False 

75 result: list[str] = [] 

76 for raw_line in lines: 

77 stripped = raw_line.strip() 

78 if stripped and not stripped.startswith("#") and "=" in raw_line: 

79 current_key, _ = raw_line.split("=", 1) 

80 if current_key.strip() == key: 

81 result.append(rendered_line) 

82 updated = True 

83 continue 

84 result.append(raw_line) 

85 if not updated: 

86 if result and result[-1] != "": 

87 result.append("") 

88 result.append(rendered_line) 

89 return "\n".join(result).rstrip() + "\n"