This is the initial fork commit for agent-admin/memoria, a production
hardening of coleam00/claude-memory-compiler. It addresses all four P0
findings from the bug audit (atomic state writes, file locking on
daily log appends, subprocess detachment, path-traversal guard) plus
several P1s (aliased wikilinks, timezone wiring, staleness-based
compile trigger, SDK retry with backoff, file-handle context manager).
File-level changes:
- LICENSE — MIT (fork is self-declared FOSS; upstream has no LICENSE
file but author has stated FOSS intent).
- pyproject.toml — renamed project to `memoria`, removed unused
python-dotenv dependency, added optional `test` dep group.
- scripts/fs_utils.py — NEW module containing the primitives that
the other patches rely on:
* atomic_write_text(path, content): tmp + fsync + os.replace;
interrupted writes leave the target unchanged.
* locked_append_text(path, content): fcntl.flock (POSIX) /
msvcrt.locking (Windows) exclusive lock around the write so
concurrent callers never interleave.
* extract_wikilinks / parse_wikilink: strip [[target|display]]
aliases correctly (fixes upstream issues #7 and #8).
* safe_article_path(link, base): resolves a wikilink slug inside
a base dir or returns None (path traversal guard).
* load_json_with_recovery(path, default): on corruption, moves
the bad file aside with a timestamped .bak-YYYYMMDDTHHMMSSZ
suffix, logs a warning, returns the default. Replaces the
silent `{}` return that would otherwise cause full-recompile.
- scripts/utils.py — save_state/load_state now use atomic writes and
corruption recovery; wiki_article_exists + count_inbound_links now
alias-aware via fs_utils helpers.
- scripts/config.py — TIMEZONE is now wired via zoneinfo.ZoneInfo
and used by now_iso/today_iso (previously defined but ignored).
Overridable via MEMORIA_TZ env var. Unknown zones log a warning
and fall back to system local time rather than crashing.
- scripts/flush.py —
* save_flush_state / load_flush_state use atomic + recovery.
* append_to_daily_log uses locked_append_text; concurrent flush
+ pre-compact calls can no longer interleave log entries.
* run_flush retries SDK failures up to MAX_SDK_ATTEMPTS=3 with
exponential backoff (2s, 4s) before returning FLUSH_ERROR.
* On FLUSH_ERROR, main() preserves the context file and does NOT
update dedup state — the next flush retries cleanly instead of
the failure being silently swallowed.
* Explicit model="haiku" for flush (short summarization task).
* maybe_trigger_compilation replaced: 6 PM wall-clock gate is
gone; trigger is now staleness-based (hash changed AND
COMPILE_INTERVAL_MIN elapsed since last compile). Configurable
via MEMORIA_COMPILE_INTERVAL_MIN. Uses _now_local() from
config so the clock respects the configured timezone.
* compile.log handle uses a `with open()` context manager so the
fd is always cleaned up, even if Popen throws.
- hooks/session-end.py, hooks/pre-compact.py — subprocess.Popen now
passes start_new_session=True on POSIX, detaching flush.py from
the hook's process group so it survives post-hook SIGHUP. Fixes
the intermittent-data-loss failure mode where flush subprocess
was killed mid-LLM-call.
Tests (formal acceptance suite still to come in this phase): each
helper verified via unit exercise in scratch directories — atomic
roundtrip, corruption recovery with .bak creation, alias parsing,
path-traversal rejection.
Upstream issue mapping: #3/#5/#9 addressed by the next commit
(compile.py + query.py scaling fix). #7/#8 addressed here via
alias-aware helpers. License (#11) resolved via MIT LICENSE.
174 lines
5.4 KiB
Python
174 lines
5.4 KiB
Python
"""
|
|
PreCompact hook - captures conversation transcript before auto-compaction.
|
|
|
|
When Claude Code's context window fills up, it auto-compacts (summarizes and
|
|
discards detail). This hook fires BEFORE that happens, extracting conversation
|
|
context and spawning flush.py to extract knowledge that would otherwise
|
|
be lost to summarization.
|
|
|
|
The hook itself does NO API calls - only local file I/O for speed (<10s).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
# Recursion guard
|
|
if os.environ.get("CLAUDE_INVOKED_BY"):
|
|
sys.exit(0)
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
SCRIPTS_DIR = ROOT / "scripts"
|
|
STATE_DIR = SCRIPTS_DIR
|
|
|
|
logging.basicConfig(
|
|
filename=str(SCRIPTS_DIR / "flush.log"),
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s [pre-compact] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
MAX_TURNS = 30
|
|
MAX_CONTEXT_CHARS = 15_000
|
|
MIN_TURNS_TO_FLUSH = 5
|
|
|
|
|
|
def extract_conversation_context(transcript_path: Path) -> tuple[str, int]:
|
|
"""Read JSONL transcript and extract last ~N conversation turns as markdown."""
|
|
turns: list[str] = []
|
|
|
|
with open(transcript_path, encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
entry = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
msg = entry.get("message", {})
|
|
if isinstance(msg, dict):
|
|
role = msg.get("role", "")
|
|
content = msg.get("content", "")
|
|
else:
|
|
role = entry.get("role", "")
|
|
content = entry.get("content", "")
|
|
|
|
if role not in ("user", "assistant"):
|
|
continue
|
|
|
|
if isinstance(content, list):
|
|
text_parts = []
|
|
for block in content:
|
|
if isinstance(block, dict) and block.get("type") == "text":
|
|
text_parts.append(block.get("text", ""))
|
|
elif isinstance(block, str):
|
|
text_parts.append(block)
|
|
content = "\n".join(text_parts)
|
|
|
|
if isinstance(content, str) and content.strip():
|
|
label = "User" if role == "user" else "Assistant"
|
|
turns.append(f"**{label}:** {content.strip()}\n")
|
|
|
|
recent = turns[-MAX_TURNS:]
|
|
context = "\n".join(recent)
|
|
|
|
if len(context) > MAX_CONTEXT_CHARS:
|
|
context = context[-MAX_CONTEXT_CHARS:]
|
|
boundary = context.find("\n**")
|
|
if boundary > 0:
|
|
context = context[boundary + 1 :]
|
|
|
|
return context, len(recent)
|
|
|
|
|
|
def main() -> None:
|
|
# Read hook input from stdin
|
|
try:
|
|
raw_input = sys.stdin.read()
|
|
try:
|
|
hook_input: dict = json.loads(raw_input)
|
|
except json.JSONDecodeError:
|
|
fixed_input = re.sub(r'(?<!\\)\\(?!["\\])', r'\\\\', raw_input)
|
|
hook_input = json.loads(fixed_input)
|
|
except (json.JSONDecodeError, ValueError, EOFError) as e:
|
|
logging.error("Failed to parse stdin: %s", e)
|
|
return
|
|
|
|
session_id = hook_input.get("session_id", "unknown")
|
|
transcript_path_str = hook_input.get("transcript_path", "")
|
|
|
|
logging.info("PreCompact fired: session=%s", session_id)
|
|
|
|
# transcript_path can be empty (known Claude Code bug #13668)
|
|
if not transcript_path_str or not isinstance(transcript_path_str, str):
|
|
logging.info("SKIP: no transcript path")
|
|
return
|
|
|
|
transcript_path = Path(transcript_path_str)
|
|
if not transcript_path.exists():
|
|
logging.info("SKIP: transcript missing: %s", transcript_path_str)
|
|
return
|
|
|
|
# Extract conversation context in the hook
|
|
try:
|
|
context, turn_count = extract_conversation_context(transcript_path)
|
|
except Exception as e:
|
|
logging.error("Context extraction failed: %s", e)
|
|
return
|
|
|
|
if not context.strip():
|
|
logging.info("SKIP: empty context")
|
|
return
|
|
|
|
if turn_count < MIN_TURNS_TO_FLUSH:
|
|
logging.info("SKIP: only %d turns (min %d)", turn_count, MIN_TURNS_TO_FLUSH)
|
|
return
|
|
|
|
# Write context to a temp file for the background process
|
|
timestamp = datetime.now(timezone.utc).astimezone().strftime("%Y%m%d-%H%M%S")
|
|
context_file = STATE_DIR / f"flush-context-{session_id}-{timestamp}.md"
|
|
context_file.write_text(context, encoding="utf-8")
|
|
|
|
# Spawn flush.py as a background process
|
|
flush_script = SCRIPTS_DIR / "flush.py"
|
|
|
|
cmd = [
|
|
"uv",
|
|
"run",
|
|
"--directory",
|
|
str(ROOT),
|
|
"python",
|
|
str(flush_script),
|
|
str(context_file),
|
|
session_id,
|
|
]
|
|
|
|
# On POSIX, start_new_session=True detaches the flush subprocess from the
|
|
# hook's process group so it survives CC's post-hook cleanup signals.
|
|
popen_kwargs: dict = {
|
|
"stdout": subprocess.DEVNULL,
|
|
"stderr": subprocess.DEVNULL,
|
|
}
|
|
if sys.platform == "win32":
|
|
popen_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
|
|
else:
|
|
popen_kwargs["start_new_session"] = True
|
|
|
|
try:
|
|
subprocess.Popen(cmd, **popen_kwargs)
|
|
logging.info("Spawned flush.py for session %s (%d turns, %d chars)", session_id, turn_count, len(context))
|
|
except Exception as e:
|
|
logging.error("Failed to spawn flush.py: %s", e)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|