memoria/scripts/fs_utils.py
agent-admin 39ab2a8b6f fork: MIT LICENSE + foundation patches (atomicity, locking, safety)
This is the initial fork commit for agent-admin/memoria, a production
hardening of coleam00/claude-memory-compiler. It addresses all four P0
findings from the bug audit (atomic state writes, file locking on
daily log appends, subprocess detachment, path-traversal guard) plus
several P1s (aliased wikilinks, timezone wiring, staleness-based
compile trigger, SDK retry with backoff, file-handle context manager).

File-level changes:

- LICENSE — MIT (fork is self-declared FOSS; upstream has no LICENSE
  file but author has stated FOSS intent).

- pyproject.toml — renamed project to `memoria`, removed unused
  python-dotenv dependency, added optional `test` dep group.

- scripts/fs_utils.py — NEW module containing the primitives that
  the other patches rely on:
    * atomic_write_text(path, content): tmp + fsync + os.replace;
      interrupted writes leave the target unchanged.
    * locked_append_text(path, content): fcntl.flock (POSIX) /
      msvcrt.locking (Windows) exclusive lock around the write so
      concurrent callers never interleave.
    * extract_wikilinks / parse_wikilink: strip [[target|display]]
      aliases correctly (fixes upstream issues #7 and #8).
    * safe_article_path(link, base): resolves a wikilink slug inside
      a base dir or returns None (path traversal guard).
    * load_json_with_recovery(path, default): on corruption, moves
      the bad file aside with a timestamped .bak-YYYYMMDDTHHMMSSZ
      suffix, logs a warning, returns the default. Replaces the
      silent `{}` return that would otherwise cause full-recompile.

- scripts/utils.py — save_state/load_state now use atomic writes and
  corruption recovery; wiki_article_exists + count_inbound_links now
  alias-aware via fs_utils helpers.

- scripts/config.py — TIMEZONE is now wired via zoneinfo.ZoneInfo
  and used by now_iso/today_iso (previously defined but ignored).
  Overridable via MEMORIA_TZ env var. Unknown zones log a warning
  and fall back to system local time rather than crashing.

- scripts/flush.py —
    * save_flush_state / load_flush_state use atomic + recovery.
    * append_to_daily_log uses locked_append_text; concurrent flush
      + pre-compact calls can no longer interleave log entries.
    * run_flush retries SDK failures up to MAX_SDK_ATTEMPTS=3 with
      exponential backoff (2s, 4s) before returning FLUSH_ERROR.
    * On FLUSH_ERROR, main() preserves the context file and does NOT
      update dedup state — the next flush retries cleanly instead of
      the failure being silently swallowed.
    * Explicit model="haiku" for flush (short summarization task).
    * maybe_trigger_compilation replaced: 6 PM wall-clock gate is
      gone; trigger is now staleness-based (hash changed AND
      COMPILE_INTERVAL_MIN elapsed since last compile). Configurable
      via MEMORIA_COMPILE_INTERVAL_MIN. Uses _now_local() from
      config so the clock respects the configured timezone.
    * compile.log handle uses a `with open()` context manager so the
      fd is always cleaned up, even if Popen throws.

- hooks/session-end.py, hooks/pre-compact.py — subprocess.Popen now
  passes start_new_session=True on POSIX, detaching flush.py from
  the hook's process group so it survives post-hook SIGHUP. Fixes
  the intermittent-data-loss failure mode where flush subprocess
  was killed mid-LLM-call.

Tests (formal acceptance suite still to come in this phase): each
helper verified via unit exercise in scratch directories — atomic
roundtrip, corruption recovery with .bak creation, alias parsing,
path-traversal rejection.

Upstream issue mapping: #3/#5/#9 addressed by the next commit
(compile.py + query.py scaling fix). #7/#8 addressed here via
alias-aware helpers. License (#11) resolved via MIT LICENSE.
2026-04-24 17:44:07 -04:00

248 lines
8.5 KiB
Python

"""Filesystem + state-integrity helpers for memoria.
Provides the primitives the rest of the code uses to avoid the common
failure modes: interrupted writes truncating state files, concurrent
appends interleaving into daily logs, and LLM-authored wikilink slugs
escaping the knowledge directory.
All helpers are defensive by design — callers should NOT wrap these in
their own try/except for the ordinary paths. Specific exceptions
(StateCorruptionError, UnsafePathError) are raised for conditions the
caller may want to handle; everything else bubbles.
"""
from __future__ import annotations
import json
import logging
import os
import re
import sys
import tempfile
from pathlib import Path
from typing import Any
# ── File locking ────────────────────────────────────────────────────────
# POSIX: fcntl.flock (advisory, per-process).
# Windows: msvcrt.locking (mandatory, byte-range). Falls back to best-effort
# (no lock) with a warning if neither is available — should never happen.
_PLATFORM_POSIX = sys.platform != "win32"
if _PLATFORM_POSIX:
import fcntl
def _lock_exclusive(fd: int) -> None:
fcntl.flock(fd, fcntl.LOCK_EX)
def _unlock(fd: int) -> None:
fcntl.flock(fd, fcntl.LOCK_UN)
else:
try:
import msvcrt # type: ignore[import-not-found]
def _lock_exclusive(fd: int) -> None:
msvcrt.locking(fd, msvcrt.LK_LOCK, 1) # type: ignore[attr-defined]
def _unlock(fd: int) -> None:
msvcrt.locking(fd, msvcrt.LK_UNLCK, 1) # type: ignore[attr-defined]
except ImportError: # pragma: no cover
def _lock_exclusive(fd: int) -> None:
logging.warning("File locking unavailable on this platform; concurrent writes may corrupt data.")
def _unlock(fd: int) -> None:
pass
# ── Atomic writes ───────────────────────────────────────────────────────
def atomic_write_text(path: Path, content: str, encoding: str = "utf-8") -> None:
"""Write text to `path` such that the file is either fully written or unchanged.
Writes to a temp file in the same directory, fsyncs the file and (best-effort)
the parent directory, then atomically renames into place via os.replace. On
crash during write, `path` retains its pre-call contents; a `.tmp.*` may be
left behind for manual cleanup.
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
# NamedTemporaryFile in same dir so os.replace stays same-filesystem.
tmp = tempfile.NamedTemporaryFile(
mode="w",
encoding=encoding,
dir=str(path.parent),
prefix=f".{path.name}.",
suffix=".tmp",
delete=False,
)
try:
tmp.write(content)
tmp.flush()
os.fsync(tmp.fileno())
tmp.close()
os.replace(tmp.name, path)
_fsync_dir_best_effort(path.parent)
except Exception:
# Best-effort cleanup of the temp file on failure.
try:
os.unlink(tmp.name)
except OSError:
pass
raise
def _fsync_dir_best_effort(directory: Path) -> None:
"""Fsync the containing directory so the rename is durable on POSIX.
Silently no-ops on Windows (os.open of a directory isn't supported there).
"""
if not _PLATFORM_POSIX:
return
try:
dir_fd = os.open(str(directory), os.O_RDONLY)
except OSError:
return
try:
os.fsync(dir_fd)
except OSError:
pass
finally:
os.close(dir_fd)
# ── Locked append ───────────────────────────────────────────────────────
def locked_append_text(path: Path, content: str, encoding: str = "utf-8") -> None:
"""Append `content` to `path` under an exclusive file lock.
Multiple processes calling this concurrently will serialize; writes from
different callers never interleave within one call's content. The lock is
advisory on POSIX (cooperative — all writers must use this helper).
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
# 'a' mode: O_APPEND → each write appends at the current end atomically (up
# to PIPE_BUF on Linux for a single write(), but we can't rely on content <
# PIPE_BUF). The lock guarantees full-entry atomicity regardless of size.
with open(path, "a", encoding=encoding) as f:
fd = f.fileno()
_lock_exclusive(fd)
try:
f.write(content)
f.flush()
os.fsync(fd)
finally:
_unlock(fd)
# ── Wikilink parsing ────────────────────────────────────────────────────
# Matches [[target]] and [[target|display]]. Returns just `target`.
_WIKILINK_RE = re.compile(r"\[\[([^\]|]+)(?:\|[^\]]*)?\]\]")
def extract_wikilinks(content: str) -> list[str]:
"""Return all wikilink targets from `content`, aliases stripped.
`[[concepts/foo]]` → "concepts/foo"
`[[concepts/foo|Display]]` → "concepts/foo"
"""
return _WIKILINK_RE.findall(content)
def parse_wikilink(raw: str) -> str:
"""Strip the pipe-alias suffix from a raw wikilink target, if any."""
pipe = raw.find("|")
return raw[:pipe].strip() if pipe != -1 else raw.strip()
# ── Path safety ─────────────────────────────────────────────────────────
class UnsafePathError(ValueError):
"""Raised when a path escapes its allowed base directory."""
def safe_article_path(link: str, knowledge_dir: Path) -> Path | None:
"""Resolve a wikilink slug to a path inside `knowledge_dir`.
Strips any `|display` alias, appends `.md`, resolves the path, and asserts
it remains inside `knowledge_dir`. Returns None if the link is empty/invalid
or the resolved path escapes the base.
Does NOT check whether the file exists — use `path.exists()` separately.
"""
slug = parse_wikilink(link)
if not slug or slug.startswith("/") or "\0" in slug:
return None
base = knowledge_dir.resolve()
candidate = (knowledge_dir / f"{slug}.md").resolve()
try:
candidate.relative_to(base)
except ValueError:
return None
return candidate
# ── JSON with recovery ──────────────────────────────────────────────────
class StateCorruptionError(RuntimeError):
"""Raised when a state file is unreadable or not valid JSON.
Attribute `backup_path` indicates where the corrupted file was moved.
Attribute `default` contains the default state the caller should use.
"""
def __init__(self, message: str, backup_path: Path | None, default: Any):
super().__init__(message)
self.backup_path = backup_path
self.default = default
def load_json_with_recovery(
path: Path,
default: Any,
*,
logger: logging.Logger | None = None,
) -> Any:
"""Load JSON from `path`; on corruption, move aside + return `default`.
Preserves the corrupted file at `<path>.bak-<timestamp>` so operators
can inspect what was lost. Logs a warning via the provided logger
(or the root logger) with the backup path.
"""
path = Path(path)
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError, UnicodeDecodeError) as exc:
log = logger or logging.getLogger()
from datetime import datetime, timezone
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
backup = path.with_suffix(path.suffix + f".bak-{ts}")
try:
os.replace(path, backup)
log.warning(
"State file corruption at %s: %s. Moved to %s; using default.",
path,
exc,
backup,
)
except OSError as move_exc:
log.error(
"State file corruption at %s: %s. Failed to back up: %s. Using default.",
path,
exc,
move_exc,
)
backup = None
return default