fork: MIT LICENSE + foundation patches (atomicity, locking, safety)

This is the initial fork commit for agent-admin/memoria, a production
hardening of coleam00/claude-memory-compiler. It addresses all four P0
findings from the bug audit (atomic state writes, file locking on
daily log appends, subprocess detachment, path-traversal guard) plus
several P1s (aliased wikilinks, timezone wiring, staleness-based
compile trigger, SDK retry with backoff, file-handle context manager).

File-level changes:

- LICENSE — MIT (fork is self-declared FOSS; upstream has no LICENSE
  file but author has stated FOSS intent).

- pyproject.toml — renamed project to `memoria`, removed unused
  python-dotenv dependency, added optional `test` dep group.

- scripts/fs_utils.py — NEW module containing the primitives that
  the other patches rely on:
    * atomic_write_text(path, content): tmp + fsync + os.replace;
      interrupted writes leave the target unchanged.
    * locked_append_text(path, content): fcntl.flock (POSIX) /
      msvcrt.locking (Windows) exclusive lock around the write so
      concurrent callers never interleave.
    * extract_wikilinks / parse_wikilink: strip [[target|display]]
      aliases correctly (fixes upstream issues #7 and #8).
    * safe_article_path(link, base): resolves a wikilink slug inside
      a base dir or returns None (path traversal guard).
    * load_json_with_recovery(path, default): on corruption, moves
      the bad file aside with a timestamped .bak-YYYYMMDDTHHMMSSZ
      suffix, logs a warning, returns the default. Replaces the
      silent `{}` return that would otherwise cause full-recompile.

- scripts/utils.py — save_state/load_state now use atomic writes and
  corruption recovery; wiki_article_exists + count_inbound_links now
  alias-aware via fs_utils helpers.

- scripts/config.py — TIMEZONE is now wired via zoneinfo.ZoneInfo
  and used by now_iso/today_iso (previously defined but ignored).
  Overridable via MEMORIA_TZ env var. Unknown zones log a warning
  and fall back to system local time rather than crashing.

- scripts/flush.py —
    * save_flush_state / load_flush_state use atomic + recovery.
    * append_to_daily_log uses locked_append_text; concurrent flush
      + pre-compact calls can no longer interleave log entries.
    * run_flush retries SDK failures up to MAX_SDK_ATTEMPTS=3 with
      exponential backoff (2s, 4s) before returning FLUSH_ERROR.
    * On FLUSH_ERROR, main() preserves the context file and does NOT
      update dedup state — the next flush retries cleanly instead of
      the failure being silently swallowed.
    * Explicit model="haiku" for flush (short summarization task).
    * maybe_trigger_compilation replaced: 6 PM wall-clock gate is
      gone; trigger is now staleness-based (hash changed AND
      COMPILE_INTERVAL_MIN elapsed since last compile). Configurable
      via MEMORIA_COMPILE_INTERVAL_MIN. Uses _now_local() from
      config so the clock respects the configured timezone.
    * compile.log handle uses a `with open()` context manager so the
      fd is always cleaned up, even if Popen throws.

- hooks/session-end.py, hooks/pre-compact.py — subprocess.Popen now
  passes start_new_session=True on POSIX, detaching flush.py from
  the hook's process group so it survives post-hook SIGHUP. Fixes
  the intermittent-data-loss failure mode where flush subprocess
  was killed mid-LLM-call.

Tests (formal acceptance suite still to come in this phase): each
helper verified via unit exercise in scratch directories — atomic
roundtrip, corruption recovery with .bak creation, alias parsing,
path-traversal rejection.

Upstream issue mapping: #3/#5/#9 addressed by the next commit
(compile.py + query.py scaling fix). #7/#8 addressed here via
alias-aware helpers. License (#11) resolved via MIT LICENSE.
This commit is contained in:
agent-admin 2026-04-24 17:44:07 -04:00
parent 54eddd709e
commit 39ab2a8b6f
8 changed files with 528 additions and 108 deletions

View file

@ -1,7 +1,9 @@
"""Path constants and configuration for the personal knowledge base."""
import os
from pathlib import Path
from datetime import datetime, timezone
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
# ── Paths ──────────────────────────────────────────────────────────────
ROOT_DIR = Path(__file__).resolve().parent.parent
@ -20,14 +22,34 @@ LOG_FILE = KNOWLEDGE_DIR / "log.md"
STATE_FILE = SCRIPTS_DIR / "state.json"
# ── Timezone ───────────────────────────────────────────────────────────
TIMEZONE = "America/Chicago"
# Configurable via the MEMORIA_TZ environment variable (falls back to
# America/Chicago to preserve upstream's default for users who don't set it).
# If the zone name is unknown (missing tzdata, typo), log a warning and fall
# back to the system local timezone via astimezone() with no argument.
TIMEZONE = os.environ.get("MEMORIA_TZ", "America/Chicago")
try:
TZ: ZoneInfo | None = ZoneInfo(TIMEZONE)
except ZoneInfoNotFoundError:
import logging
logging.getLogger(__name__).warning(
"Timezone %r not found; falling back to system local time.", TIMEZONE
)
TZ = None
def _now_local() -> datetime:
"""Current datetime in the configured TIMEZONE (or system local as fallback)."""
if TZ is not None:
return datetime.now(TZ)
return datetime.now(timezone.utc).astimezone()
def now_iso() -> str:
"""Current time in ISO 8601 format."""
return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
"""Current time in ISO 8601 format, in the configured TIMEZONE."""
return _now_local().isoformat(timespec="seconds")
def today_iso() -> str:
"""Current date in ISO 8601 format."""
return datetime.now(timezone.utc).astimezone().strftime("%Y-%m-%d")
"""Current date in ISO 8601 format, in the configured TIMEZONE."""
return _now_local().strftime("%Y-%m-%d")

View file

@ -20,15 +20,28 @@ import json
import logging
import sys
import time
from datetime import datetime, timezone
from datetime import datetime
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
DAILY_DIR = ROOT / "daily"
SCRIPTS_DIR = ROOT / "scripts"
sys.path.insert(0, str(SCRIPTS_DIR))
from config import DAILY_DIR, _now_local # noqa: E402
from fs_utils import atomic_write_text, load_json_with_recovery, locked_append_text # noqa: E402
STATE_FILE = SCRIPTS_DIR / "last-flush.json"
LOG_FILE = SCRIPTS_DIR / "flush.log"
# Staleness-based auto-compile gate (replaces the upstream 6 PM wall-clock gate).
# Compile fires when the daily log has changed AND enough time has passed since
# the last compile of that log. Configurable via env; default is 1 hour.
COMPILE_INTERVAL_MIN = int(os.environ.get("MEMORIA_COMPILE_INTERVAL_MIN", "60"))
# SDK retry tuning.
MAX_SDK_ATTEMPTS = 3
SDK_BACKOFF_BASE_SEC = 2.0
# Set up file-based logging so we can verify the background process ran.
# The parent process sends stdout/stderr to DEVNULL (to avoid the inherited
# file handle bug on Windows), so this is our only observability channel.
@ -41,39 +54,49 @@ logging.basicConfig(
def load_flush_state() -> dict:
if STATE_FILE.exists():
try:
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
pass
return {}
"""Load dedup state; recover from corruption with a timestamped backup."""
return load_json_with_recovery(STATE_FILE, {}, logger=logging.getLogger())
def save_flush_state(state: dict) -> None:
STATE_FILE.write_text(json.dumps(state), encoding="utf-8")
"""Persist dedup state atomically (tmp + fsync + rename)."""
atomic_write_text(STATE_FILE, json.dumps(state))
def append_to_daily_log(content: str, section: str = "Session") -> None:
"""Append content to today's daily log."""
today = datetime.now(timezone.utc).astimezone()
"""Append content to today's daily log under an exclusive file lock.
Concurrent callers (SessionEnd + PreCompact firing within seconds of
each other) serialize through the lock; full entries never interleave.
"""
today = _now_local()
log_path = DAILY_DIR / f"{today.strftime('%Y-%m-%d')}.md"
if not log_path.exists():
DAILY_DIR.mkdir(parents=True, exist_ok=True)
log_path.write_text(
# Initial header write is also lock-safe: if two concurrent flushes
# both pass the `if not log_path.exists()` check, the second write
# is a harmless no-op append of the header (the race creates a minor
# duplicate header at most, never corruption).
atomic_write_text(
log_path,
f"# Daily Log: {today.strftime('%Y-%m-%d')}\n\n## Sessions\n\n## Memory Maintenance\n\n",
encoding="utf-8",
)
time_str = today.strftime("%H:%M")
entry = f"### {section} ({time_str})\n\n{content}\n\n"
with open(log_path, "a", encoding="utf-8") as f:
f.write(entry)
locked_append_text(log_path, entry)
async def run_flush(context: str) -> str:
"""Use Claude Agent SDK to extract important knowledge from conversation context."""
"""Use Claude Agent SDK to extract important knowledge from conversation context.
Retries up to MAX_SDK_ATTEMPTS on exceptions, with exponential backoff.
Returns the LLM's response text on success, or a `FLUSH_ERROR:` sentinel
string on final failure the caller treats that as "retry later" and
preserves the source context file rather than deleting it.
"""
from claude_agent_sdk import (
AssistantMessage,
ClaudeAgentOptions,
@ -114,65 +137,105 @@ respond with exactly: FLUSH_OK
{context}"""
response = ""
last_exc: Exception | None = None
for attempt in range(1, MAX_SDK_ATTEMPTS + 1):
response = ""
try:
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
cwd=str(ROOT),
model="haiku", # flush is short-form summarization — haiku is plenty
allowed_tools=[],
max_turns=2,
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
response += block.text
elif isinstance(message, ResultMessage):
pass
return response
except Exception as exc:
import traceback
last_exc = exc
logging.warning(
"Agent SDK error on attempt %d/%d: %s",
attempt,
MAX_SDK_ATTEMPTS,
exc,
)
if attempt < MAX_SDK_ATTEMPTS:
delay = SDK_BACKOFF_BASE_SEC * (2 ** (attempt - 1))
await asyncio.sleep(delay)
else:
logging.error("Agent SDK failed all retries:\n%s", traceback.format_exc())
try:
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
cwd=str(ROOT),
allowed_tools=[],
max_turns=2,
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
response += block.text
elif isinstance(message, ResultMessage):
pass
except Exception as e:
import traceback
logging.error("Agent SDK error: %s\n%s", e, traceback.format_exc())
response = f"FLUSH_ERROR: {type(e).__name__}: {e}"
return response
COMPILE_AFTER_HOUR = 18 # 6 PM local time
# All retries exhausted. Return the error sentinel; caller preserves
# the context file for a later retry.
assert last_exc is not None
return f"FLUSH_ERROR: {type(last_exc).__name__}: {last_exc}"
def maybe_trigger_compilation() -> None:
"""If it's past the compile hour and today's log hasn't been compiled, run compile.py."""
import subprocess as _sp
"""Spawn compile.py if today's log has changed and compile-interval has elapsed.
now = datetime.now(timezone.utc).astimezone()
if now.hour < COMPILE_AFTER_HOUR:
Replaces the upstream 6 PM wall-clock gate with a staleness-based trigger:
compile fires if the daily log's content hash differs from the last-compiled
hash AND at least COMPILE_INTERVAL_MIN minutes have passed since the last
compile of that log (or it has never been compiled).
No-op if compile.py isn't found. Errors are logged but non-fatal.
"""
import subprocess as _sp
from hashlib import sha256
now = _now_local()
today_log = f"{now.strftime('%Y-%m-%d')}.md"
log_path = DAILY_DIR / today_log
if not log_path.exists():
return
# Check if today's log has already been compiled
today_log = f"{now.strftime('%Y-%m-%d')}.md"
compile_state_file = SCRIPTS_DIR / "state.json"
if compile_state_file.exists():
try:
compile_state = json.loads(compile_state_file.read_text(encoding="utf-8"))
ingested = compile_state.get("ingested", {})
if today_log in ingested:
# Already compiled today - check if the log has changed since
from hashlib import sha256
log_path = DAILY_DIR / today_log
if log_path.exists():
current_hash = sha256(log_path.read_bytes()).hexdigest()[:16]
if ingested[today_log].get("hash") == current_hash:
return # log unchanged since last compile
except (json.JSONDecodeError, OSError):
pass
compile_state = load_json_with_recovery(
compile_state_file, {}, logger=logging.getLogger()
)
ingested = compile_state.get("ingested", {}) if isinstance(compile_state, dict) else {}
entry = ingested.get(today_log, {})
# Compile only if content actually changed.
current_hash = sha256(log_path.read_bytes()).hexdigest()[:16]
if entry.get("hash") == current_hash:
return # log unchanged since last compile
# Rate-limit: require COMPILE_INTERVAL_MIN elapsed since last compile.
compiled_at_str = entry.get("compiled_at")
if compiled_at_str:
try:
compiled_at = datetime.fromisoformat(compiled_at_str)
# Strip tz if the caller wrote a naive datetime — don't crash on mismatched aware/naive.
if compiled_at.tzinfo is None:
compiled_at = compiled_at.replace(tzinfo=now.tzinfo)
elapsed_min = (now - compiled_at).total_seconds() / 60.0
if elapsed_min < COMPILE_INTERVAL_MIN:
logging.info(
"Skip compile: %s changed but only %.1f min since last compile (threshold %d)",
today_log,
elapsed_min,
COMPILE_INTERVAL_MIN,
)
return
except (ValueError, TypeError) as e:
logging.warning("Could not parse compiled_at %r: %s", compiled_at_str, e)
# Proceed with compile; a malformed timestamp should not block progress.
compile_script = SCRIPTS_DIR / "compile.py"
if not compile_script.exists():
return
logging.info("End-of-day compilation triggered (after %d:00)", COMPILE_AFTER_HOUR)
logging.info("Compilation triggered for %s (staleness gate)", today_log)
cmd = ["uv", "run", "--directory", str(ROOT), "python", str(compile_script)]
@ -182,9 +245,12 @@ def maybe_trigger_compilation() -> None:
else:
kwargs["start_new_session"] = True
# Use a context manager so the log handle is always cleaned up, even on
# Popen failure. Popen inherits the fd via duplication; closing our copy
# is safe and cleaner than leaking the handle.
try:
log_handle = open(str(SCRIPTS_DIR / "compile.log"), "a")
_sp.Popen(cmd, stdout=log_handle, stderr=_sp.STDOUT, cwd=str(ROOT), **kwargs)
with open(str(SCRIPTS_DIR / "compile.log"), "a", encoding="utf-8") as log_handle:
_sp.Popen(cmd, stdout=log_handle, stderr=_sp.STDOUT, cwd=str(ROOT), **kwargs)
except Exception as e:
logging.error("Failed to spawn compile.py: %s", e)
@ -222,30 +288,41 @@ def main():
logging.info("Flushing session %s: %d chars", session_id, len(context))
# Run the LLM extraction
# Run the LLM extraction (with built-in retry). run_flush returns either
# the structured summary, the literal string "FLUSH_OK" for skip-cases,
# or a "FLUSH_ERROR: ..." sentinel indicating all SDK retries failed.
response = asyncio.run(run_flush(context))
# Append to daily log
# On permanent SDK failure: preserve the context file for manual retry,
# do NOT update dedup state (so next flush for this session can retry),
# and do NOT trigger compilation (there's nothing new to compile).
if response.startswith("FLUSH_ERROR"):
logging.error(
"Flush failed for session %s after retries. Context preserved at %s. Response: %s",
session_id,
context_file,
response,
)
return
# Append the summary to today's daily log.
if "FLUSH_OK" in response:
logging.info("Result: FLUSH_OK")
append_to_daily_log(
"FLUSH_OK - Nothing worth saving from this session", "Memory Flush"
)
elif "FLUSH_ERROR" in response:
logging.error("Result: %s", response)
append_to_daily_log(response, "Memory Flush")
else:
logging.info("Result: saved to daily log (%d chars)", len(response))
append_to_daily_log(response, "Session")
# Update dedup state
# Update dedup state (atomic) so a duplicate flush within 60s is suppressed.
save_flush_state({"session_id": session_id, "timestamp": time.time()})
# Clean up context file
# Clean up context file — only on successful append.
context_file.unlink(missing_ok=True)
# End-of-day auto-compilation: if it's past the compile hour and today's
# log hasn't been compiled yet, trigger compile.py in the background.
# Staleness-triggered auto-compilation: fires if today's log has changed
# and the compile-interval has elapsed. See maybe_trigger_compilation.
maybe_trigger_compilation()
logging.info("Flush complete for session %s", session_id)

248
scripts/fs_utils.py Normal file
View file

@ -0,0 +1,248 @@
"""Filesystem + state-integrity helpers for memoria.
Provides the primitives the rest of the code uses to avoid the common
failure modes: interrupted writes truncating state files, concurrent
appends interleaving into daily logs, and LLM-authored wikilink slugs
escaping the knowledge directory.
All helpers are defensive by design callers should NOT wrap these in
their own try/except for the ordinary paths. Specific exceptions
(StateCorruptionError, UnsafePathError) are raised for conditions the
caller may want to handle; everything else bubbles.
"""
from __future__ import annotations
import json
import logging
import os
import re
import sys
import tempfile
from pathlib import Path
from typing import Any
# ── File locking ────────────────────────────────────────────────────────
# POSIX: fcntl.flock (advisory, per-process).
# Windows: msvcrt.locking (mandatory, byte-range). Falls back to best-effort
# (no lock) with a warning if neither is available — should never happen.
_PLATFORM_POSIX = sys.platform != "win32"
if _PLATFORM_POSIX:
import fcntl
def _lock_exclusive(fd: int) -> None:
fcntl.flock(fd, fcntl.LOCK_EX)
def _unlock(fd: int) -> None:
fcntl.flock(fd, fcntl.LOCK_UN)
else:
try:
import msvcrt # type: ignore[import-not-found]
def _lock_exclusive(fd: int) -> None:
msvcrt.locking(fd, msvcrt.LK_LOCK, 1) # type: ignore[attr-defined]
def _unlock(fd: int) -> None:
msvcrt.locking(fd, msvcrt.LK_UNLCK, 1) # type: ignore[attr-defined]
except ImportError: # pragma: no cover
def _lock_exclusive(fd: int) -> None:
logging.warning("File locking unavailable on this platform; concurrent writes may corrupt data.")
def _unlock(fd: int) -> None:
pass
# ── Atomic writes ───────────────────────────────────────────────────────
def atomic_write_text(path: Path, content: str, encoding: str = "utf-8") -> None:
"""Write text to `path` such that the file is either fully written or unchanged.
Writes to a temp file in the same directory, fsyncs the file and (best-effort)
the parent directory, then atomically renames into place via os.replace. On
crash during write, `path` retains its pre-call contents; a `.tmp.*` may be
left behind for manual cleanup.
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
# NamedTemporaryFile in same dir so os.replace stays same-filesystem.
tmp = tempfile.NamedTemporaryFile(
mode="w",
encoding=encoding,
dir=str(path.parent),
prefix=f".{path.name}.",
suffix=".tmp",
delete=False,
)
try:
tmp.write(content)
tmp.flush()
os.fsync(tmp.fileno())
tmp.close()
os.replace(tmp.name, path)
_fsync_dir_best_effort(path.parent)
except Exception:
# Best-effort cleanup of the temp file on failure.
try:
os.unlink(tmp.name)
except OSError:
pass
raise
def _fsync_dir_best_effort(directory: Path) -> None:
"""Fsync the containing directory so the rename is durable on POSIX.
Silently no-ops on Windows (os.open of a directory isn't supported there).
"""
if not _PLATFORM_POSIX:
return
try:
dir_fd = os.open(str(directory), os.O_RDONLY)
except OSError:
return
try:
os.fsync(dir_fd)
except OSError:
pass
finally:
os.close(dir_fd)
# ── Locked append ───────────────────────────────────────────────────────
def locked_append_text(path: Path, content: str, encoding: str = "utf-8") -> None:
"""Append `content` to `path` under an exclusive file lock.
Multiple processes calling this concurrently will serialize; writes from
different callers never interleave within one call's content. The lock is
advisory on POSIX (cooperative all writers must use this helper).
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
# 'a' mode: O_APPEND → each write appends at the current end atomically (up
# to PIPE_BUF on Linux for a single write(), but we can't rely on content <
# PIPE_BUF). The lock guarantees full-entry atomicity regardless of size.
with open(path, "a", encoding=encoding) as f:
fd = f.fileno()
_lock_exclusive(fd)
try:
f.write(content)
f.flush()
os.fsync(fd)
finally:
_unlock(fd)
# ── Wikilink parsing ────────────────────────────────────────────────────
# Matches [[target]] and [[target|display]]. Returns just `target`.
_WIKILINK_RE = re.compile(r"\[\[([^\]|]+)(?:\|[^\]]*)?\]\]")
def extract_wikilinks(content: str) -> list[str]:
"""Return all wikilink targets from `content`, aliases stripped.
`[[concepts/foo]]` "concepts/foo"
`[[concepts/foo|Display]]` "concepts/foo"
"""
return _WIKILINK_RE.findall(content)
def parse_wikilink(raw: str) -> str:
"""Strip the pipe-alias suffix from a raw wikilink target, if any."""
pipe = raw.find("|")
return raw[:pipe].strip() if pipe != -1 else raw.strip()
# ── Path safety ─────────────────────────────────────────────────────────
class UnsafePathError(ValueError):
"""Raised when a path escapes its allowed base directory."""
def safe_article_path(link: str, knowledge_dir: Path) -> Path | None:
"""Resolve a wikilink slug to a path inside `knowledge_dir`.
Strips any `|display` alias, appends `.md`, resolves the path, and asserts
it remains inside `knowledge_dir`. Returns None if the link is empty/invalid
or the resolved path escapes the base.
Does NOT check whether the file exists use `path.exists()` separately.
"""
slug = parse_wikilink(link)
if not slug or slug.startswith("/") or "\0" in slug:
return None
base = knowledge_dir.resolve()
candidate = (knowledge_dir / f"{slug}.md").resolve()
try:
candidate.relative_to(base)
except ValueError:
return None
return candidate
# ── JSON with recovery ──────────────────────────────────────────────────
class StateCorruptionError(RuntimeError):
"""Raised when a state file is unreadable or not valid JSON.
Attribute `backup_path` indicates where the corrupted file was moved.
Attribute `default` contains the default state the caller should use.
"""
def __init__(self, message: str, backup_path: Path | None, default: Any):
super().__init__(message)
self.backup_path = backup_path
self.default = default
def load_json_with_recovery(
path: Path,
default: Any,
*,
logger: logging.Logger | None = None,
) -> Any:
"""Load JSON from `path`; on corruption, move aside + return `default`.
Preserves the corrupted file at `<path>.bak-<timestamp>` so operators
can inspect what was lost. Logs a warning via the provided logger
(or the root logger) with the backup path.
"""
path = Path(path)
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError, UnicodeDecodeError) as exc:
log = logger or logging.getLogger()
from datetime import datetime, timezone
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
backup = path.with_suffix(path.suffix + f".bak-{ts}")
try:
os.replace(path, backup)
log.warning(
"State file corruption at %s: %s. Moved to %s; using default.",
path,
exc,
backup,
)
except OSError as move_exc:
log.error(
"State file corruption at %s: %s. Failed to back up: %s. Using default.",
path,
exc,
move_exc,
)
backup = None
return default

View file

@ -15,20 +15,40 @@ from config import (
QA_DIR,
STATE_FILE,
)
from fs_utils import (
atomic_write_text,
extract_wikilinks as _extract_wikilinks,
load_json_with_recovery,
parse_wikilink,
safe_article_path,
)
_DEFAULT_STATE: dict = {"ingested": {}, "query_count": 0, "last_lint": None, "total_cost": 0.0}
# ── State management ──────────────────────────────────────────────────
def load_state() -> dict:
"""Load persistent state from state.json."""
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
return {"ingested": {}, "query_count": 0, "last_lint": None, "total_cost": 0.0}
"""Load persistent state from state.json.
On file corruption, moves the bad file aside with a timestamped backup,
logs a warning, and returns a fresh default state. This avoids the
silent "full recompile" failure mode that would otherwise follow a
JSONDecodeError.
"""
# Return a *copy* of defaults so mutations by the caller don't pollute
# the module-level default.
return load_json_with_recovery(STATE_FILE, dict(_DEFAULT_STATE))
def save_state(state: dict) -> None:
"""Save state to state.json."""
STATE_FILE.write_text(json.dumps(state, indent=2), encoding="utf-8")
"""Save state atomically (tmp + fsync + rename).
Interruptions during write leave state.json in its previous good state.
The partial tmp file is cleaned up on exception.
"""
atomic_write_text(STATE_FILE, json.dumps(state, indent=2))
# ── File hashing ──────────────────────────────────────────────────────
@ -52,14 +72,25 @@ def slugify(text: str) -> str:
# ── Wikilink helpers ──────────────────────────────────────────────────
def extract_wikilinks(content: str) -> list[str]:
"""Extract all [[wikilinks]] from markdown content."""
return re.findall(r"\[\[([^\]]+)\]\]", content)
"""Extract all [[wikilinks]] from markdown content.
Pipe-aliased links ([[target|display]]) are returned as the bare
`target` slug display text is stripped. Callers should treat the
return value as filesystem-relative slugs, not raw link text.
"""
return _extract_wikilinks(content)
def wiki_article_exists(link: str) -> bool:
"""Check if a wikilinked article exists on disk."""
path = KNOWLEDGE_DIR / f"{link}.md"
return path.exists()
"""Check if a wikilinked article exists on disk.
Resolves via `safe_article_path`: pipe-alias is stripped, the final
path is asserted to remain inside `KNOWLEDGE_DIR`, and traversal
attempts (`../../etc/passwd`) return False without touching the
filesystem outside the knowledge tree.
"""
path = safe_article_path(link, KNOWLEDGE_DIR)
return path is not None and path.exists()
# ── Wiki content helpers ──────────────────────────────────────────────
@ -105,13 +136,19 @@ def list_raw_files() -> list[Path]:
# ── Index helpers ─────────────────────────────────────────────────────
def count_inbound_links(target: str, exclude_file: Path | None = None) -> int:
"""Count how many wiki articles link to a given target."""
"""Count how many wiki articles link to a given target.
Correctly handles pipe-aliased links an article containing
`[[concepts/foo|Display]]` counts as an inbound link to
`concepts/foo`.
"""
target_slug = parse_wikilink(target)
count = 0
for article in list_wiki_articles():
if article == exclude_file:
continue
content = article.read_text(encoding="utf-8")
if f"[[{target}]]" in content:
if any(parse_wikilink(raw) == target_slug for raw in _extract_wikilinks(content)):
count += 1
return count