memoria/tests/test_fs_utils.py
agent-admin 347d191935 fork: tests (29 green) + fork README + pytest config
Acceptance test suite under tests/ covers 8 of the 10 audit-defined
assertions directly (the 2 that require integration-level fixtures —
flush-subprocess-survives-hook-exit and whole-wiki-not-in-prompt
token-count — are documented as manual-test checks rather than
automated).

tests/test_fs_utils.py — 17 tests
  * Atomic write: roundtrip, overwrite, original-preserved-on-exception,
    parent-dir-creation.
  * Locked append: 4 concurrent workers × 25 entries each, asserts every
    entry appears exactly once and its body lines are contiguous. This
    is the acceptance criterion for "two concurrent flushes don't
    interleave writes."
  * JSON recovery: clean roundtrip, missing-file default, corruption
    produces timestamped .bak and returns default.
  * Wikilink parsing: bare / aliased / mixed; parse_wikilink strip.
  * Path safety: clean / traversal / absolute / empty / null-byte /
    aliased-but-safe.

tests/test_compile_chunking.py — 8 tests
  * Chunking: small log passthrough, byte-exact reconstruction,
    boundary respect, oversized-single-section, mixed-size packing.
  * State-on-failure: single-chunk SDK error does NOT update state;
    multi-chunk partial failure does NOT update state; all-chunks
    succeed DOES update state with hash + cost.

tests/test_lint_backlinks.py — 4 tests
  * Aliased wikilinks aren't flagged as broken links.
  * Aliased backlinks count as valid inbound references (the C9 fix).
  * QA articles referencing concepts don't trigger backlink suggestions.
  * Concept-to-concept asymmetry IS still reported (C9 scope is narrow).

FORK.md — fork-specific docs:
  * Summary of delta vs upstream (data-integrity, scaling, correctness,
    safety, configurability, hygiene categories)
  * Full env-var reference
  * Test invocation + coverage summary
  * Upstream sync guidance (cherry-pick, don't blind-pull)

Result: 29 passed in 0.07s. All patches in this fork verified via
automated test before any production use.
2026-04-24 17:54:00 -04:00

228 lines
7.8 KiB
Python

"""Tests for scripts/fs_utils.py primitives.
Covers audit assertions: concurrent flush safety (acceptance #1),
crash/corruption recovery (#2, #9), path-traversal rejection (#8),
aliased wikilinks (#6 — the parsing side).
"""
from __future__ import annotations
import json
import multiprocessing as mp
import os
import re
import time
from pathlib import Path
import pytest
from fs_utils import (
atomic_write_text,
extract_wikilinks,
load_json_with_recovery,
locked_append_text,
parse_wikilink,
safe_article_path,
)
# ── Atomic writes ───────────────────────────────────────────────────────
def test_atomic_write_roundtrip(tmp_path: Path) -> None:
target = tmp_path / "state.json"
atomic_write_text(target, '{"a": 1}')
assert target.read_text() == '{"a": 1}'
def test_atomic_write_overwrites(tmp_path: Path) -> None:
target = tmp_path / "state.json"
target.write_text("old content")
atomic_write_text(target, "new content")
assert target.read_text() == "new content"
def test_atomic_write_leaves_original_on_exception(tmp_path: Path, monkeypatch) -> None:
"""Simulate a write that fails mid-way. Target must retain old content."""
target = tmp_path / "state.json"
target.write_text("old content")
# Patch os.replace to raise — simulates a filesystem error at the
# critical moment between tmp-write and atomic rename.
import fs_utils as fsu
original_replace = os.replace
def boom(*args, **kwargs):
raise OSError("simulated filesystem failure")
monkeypatch.setattr(os, "replace", boom)
with pytest.raises(OSError, match="simulated"):
atomic_write_text(target, "new content")
# Original is intact.
assert target.read_text() == "old content"
# No leftover tmp files.
monkeypatch.setattr(os, "replace", original_replace)
tmp_leftovers = list(tmp_path.glob(".state.json.*.tmp"))
assert tmp_leftovers == []
def test_atomic_write_creates_parent_dirs(tmp_path: Path) -> None:
target = tmp_path / "nested" / "deep" / "state.json"
atomic_write_text(target, "ok")
assert target.read_text() == "ok"
# ── Locked append (acceptance #1) ───────────────────────────────────────
def _appender_worker(path_str: str, marker: str, count: int) -> None:
"""Worker: append `count` well-bounded entries to `path_str`.
Each entry is a full 'header + body + blank line' unit; if writes
interleave, the stream will contain a partial entry.
"""
from fs_utils import locked_append_text # re-import in worker
for i in range(count):
entry = (
f"### {marker} entry {i}\n"
f"body line 1 of {marker}-{i}\n"
f"body line 2 of {marker}-{i}\n"
f"\n"
)
locked_append_text(Path(path_str), entry)
def test_locked_append_no_interleaving(tmp_path: Path) -> None:
"""Run many concurrent appenders; verify no entry is split or interleaved."""
target = tmp_path / "daily.md"
n_workers = 4
entries_per_worker = 25
markers = [f"W{i}" for i in range(n_workers)]
ctx = mp.get_context("fork")
procs = [
ctx.Process(target=_appender_worker, args=(str(target), m, entries_per_worker))
for m in markers
]
for p in procs:
p.start()
for p in procs:
p.join(timeout=30)
assert p.exitcode == 0, f"worker {p} failed"
content = target.read_text()
# Every expected entry must appear exactly once and each entry's body
# lines must be contiguous (no interleaving).
for marker in markers:
for i in range(entries_per_worker):
header = f"### {marker} entry {i}\n"
body1 = f"body line 1 of {marker}-{i}"
body2 = f"body line 2 of {marker}-{i}"
# Header appears exactly once.
assert content.count(header) == 1, f"missing or duplicate header: {header}"
# Body line immediately follows header (no interleaving).
idx = content.index(header)
tail = content[idx + len(header):]
assert tail.startswith(body1 + "\n" + body2 + "\n"), (
f"entry {marker}-{i} body was interleaved or split"
)
# ── JSON recovery (acceptance #9) ───────────────────────────────────────
def test_json_recovery_roundtrip_clean(tmp_path: Path) -> None:
target = tmp_path / "state.json"
target.write_text(json.dumps({"a": 1}))
assert load_json_with_recovery(target, {}) == {"a": 1}
def test_json_recovery_missing_returns_default(tmp_path: Path) -> None:
target = tmp_path / "does-not-exist.json"
assert load_json_with_recovery(target, {"fresh": True}) == {"fresh": True}
def test_json_recovery_corruption_creates_backup(tmp_path: Path) -> None:
target = tmp_path / "state.json"
target.write_text("{ this is not valid json")
default = {"ingested": {}}
result = load_json_with_recovery(target, default)
# Returned the default.
assert result == default
# Corrupted file moved aside with .bak-YYYYMMDDTHHMMSSZ suffix.
bak_files = list(tmp_path.glob("state.json.bak-*"))
assert len(bak_files) == 1, f"expected one .bak file, found {bak_files}"
assert re.search(r"bak-\d{8}T\d{6}Z$", bak_files[0].name)
# The corrupt content is preserved in the backup.
assert bak_files[0].read_text() == "{ this is not valid json"
# The main target no longer exists (was moved, not deleted).
assert not target.exists()
# ── Wikilink parsing (acceptance #6) ────────────────────────────────────
def test_extract_wikilinks_bare() -> None:
assert extract_wikilinks("see [[concepts/foo]] and [[concepts/bar]]") == [
"concepts/foo",
"concepts/bar",
]
def test_extract_wikilinks_aliased() -> None:
assert extract_wikilinks(
"ref [[concepts/foo|Foo Display]] and [[concepts/bar|Bar]]"
) == ["concepts/foo", "concepts/bar"]
def test_extract_wikilinks_mixed() -> None:
assert extract_wikilinks(
"[[concepts/a]] and [[concepts/b|B]] and [[concepts/c]]"
) == ["concepts/a", "concepts/b", "concepts/c"]
def test_parse_wikilink() -> None:
assert parse_wikilink("concepts/foo") == "concepts/foo"
assert parse_wikilink("concepts/foo|Display") == "concepts/foo"
assert parse_wikilink(" spaces ") == "spaces"
# ── Path traversal (acceptance #8) ──────────────────────────────────────
def test_safe_path_clean(tmp_path: Path) -> None:
base = tmp_path / "kb"
base.mkdir()
result = safe_article_path("concepts/foo", base)
assert result is not None
assert result == (base / "concepts" / "foo.md").resolve()
def test_safe_path_traversal_rejected(tmp_path: Path) -> None:
base = tmp_path / "kb"
base.mkdir()
assert safe_article_path("../../etc/passwd", base) is None
assert safe_article_path("../outside", base) is None
def test_safe_path_absolute_rejected(tmp_path: Path) -> None:
base = tmp_path / "kb"
base.mkdir()
assert safe_article_path("/etc/passwd", base) is None
def test_safe_path_empty_and_invalid(tmp_path: Path) -> None:
base = tmp_path / "kb"
base.mkdir()
assert safe_article_path("", base) is None
assert safe_article_path("foo\0bar", base) is None # null byte
def test_safe_path_strips_alias(tmp_path: Path) -> None:
base = tmp_path / "kb"
base.mkdir()
result = safe_article_path("concepts/foo|Display", base)
assert result == (base / "concepts" / "foo.md").resolve()