fork: tests (29 green) + fork README + pytest config
Acceptance test suite under tests/ covers 8 of the 10 audit-defined
assertions directly (the 2 that require integration-level fixtures —
flush-subprocess-survives-hook-exit and whole-wiki-not-in-prompt
token-count — are documented as manual-test checks rather than
automated).
tests/test_fs_utils.py — 17 tests
* Atomic write: roundtrip, overwrite, original-preserved-on-exception,
parent-dir-creation.
* Locked append: 4 concurrent workers × 25 entries each, asserts every
entry appears exactly once and its body lines are contiguous. This
is the acceptance criterion for "two concurrent flushes don't
interleave writes."
* JSON recovery: clean roundtrip, missing-file default, corruption
produces timestamped .bak and returns default.
* Wikilink parsing: bare / aliased / mixed; parse_wikilink strip.
* Path safety: clean / traversal / absolute / empty / null-byte /
aliased-but-safe.
tests/test_compile_chunking.py — 8 tests
* Chunking: small log passthrough, byte-exact reconstruction,
boundary respect, oversized-single-section, mixed-size packing.
* State-on-failure: single-chunk SDK error does NOT update state;
multi-chunk partial failure does NOT update state; all-chunks
succeed DOES update state with hash + cost.
tests/test_lint_backlinks.py — 4 tests
* Aliased wikilinks aren't flagged as broken links.
* Aliased backlinks count as valid inbound references (the C9 fix).
* QA articles referencing concepts don't trigger backlink suggestions.
* Concept-to-concept asymmetry IS still reported (C9 scope is narrow).
FORK.md — fork-specific docs:
* Summary of delta vs upstream (data-integrity, scaling, correctness,
safety, configurability, hygiene categories)
* Full env-var reference
* Test invocation + coverage summary
* Upstream sync guidance (cherry-pick, don't blind-pull)
Result: 29 passed in 0.07s. All patches in this fork verified via
automated test before any production use.
This commit is contained in:
parent
03296be47a
commit
347d191935
7 changed files with 1096 additions and 312 deletions
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
16
tests/conftest.py
Normal file
16
tests/conftest.py
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
"""Shared pytest fixtures.
|
||||
|
||||
Ensures the `scripts/` directory is on sys.path so tests can `import
|
||||
fs_utils`, `import utils`, etc. without installing the package.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
_ROOT = Path(__file__).resolve().parent.parent
|
||||
_SCRIPTS = _ROOT / "scripts"
|
||||
|
||||
if str(_SCRIPTS) not in sys.path:
|
||||
sys.path.insert(0, str(_SCRIPTS))
|
||||
209
tests/test_compile_chunking.py
Normal file
209
tests/test_compile_chunking.py
Normal file
|
|
@ -0,0 +1,209 @@
|
|||
"""Tests for compile.py chunking + state-on-failure behavior.
|
||||
|
||||
Covers audit assertions #4 (failed SDK call doesn't mark log as compiled)
|
||||
and the chunking correctness that underpins #5 (bounded prompt size).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
import compile as compile_mod
|
||||
|
||||
|
||||
# ── Chunking correctness ────────────────────────────────────────────────
|
||||
|
||||
def test_chunk_small_log_single_chunk() -> None:
|
||||
small = "### Section A\nbody\n\n### Section B\nbody\n"
|
||||
chunks = compile_mod._split_log_into_chunks(small, max_chars=100_000)
|
||||
assert chunks == [small]
|
||||
|
||||
|
||||
def test_chunk_reconstructs_byte_exact() -> None:
|
||||
sections = [f"### Section {i}\n" + "x" * 40_000 + "\n" for i in range(3)]
|
||||
large = "".join(sections)
|
||||
chunks = compile_mod._split_log_into_chunks(large, max_chars=100_000)
|
||||
assert "".join(chunks) == large
|
||||
|
||||
|
||||
def test_chunk_respects_section_boundaries() -> None:
|
||||
# 4 sections of 30k each; max 70k → each chunk should be a clean
|
||||
# boundary cut, never splitting a `### ` header from its body.
|
||||
sections = [f"### S{i}\n" + "y" * 30_000 + "\n" for i in range(4)]
|
||||
large = "".join(sections)
|
||||
chunks = compile_mod._split_log_into_chunks(large, max_chars=70_000)
|
||||
# Every chunk that isn't the first should start with `### `.
|
||||
for chunk in chunks:
|
||||
if chunk != chunks[0]:
|
||||
assert chunk.startswith("### "), f"chunk boundary split mid-section: {chunk[:50]!r}"
|
||||
|
||||
|
||||
def test_chunk_oversized_single_section_emits_alone() -> None:
|
||||
# Section larger than max_chars should be emitted as its own oversized
|
||||
# chunk rather than being split mid-thought.
|
||||
huge = "### One big section\n" + "z" * 150_000 + "\n"
|
||||
chunks = compile_mod._split_log_into_chunks(huge, max_chars=100_000)
|
||||
assert len(chunks) == 1
|
||||
assert chunks[0] == huge
|
||||
|
||||
|
||||
def test_chunk_mixed_sizes() -> None:
|
||||
# Section headers must anchor at line-start (regex is `(?m)(?=^### )`),
|
||||
# so each section body must end with a newline.
|
||||
parts = [
|
||||
"### A\n" + "a" * 30_000 + "\n",
|
||||
"### B\n" + "b" * 30_000 + "\n",
|
||||
"### C\n" + "c" * 30_000 + "\n",
|
||||
]
|
||||
content = "".join(parts) # ~90K
|
||||
chunks = compile_mod._split_log_into_chunks(content, max_chars=80_000)
|
||||
# First chunk should contain A + B (~60K), second chunk C.
|
||||
assert len(chunks) == 2
|
||||
assert "### A" in chunks[0] and "### B" in chunks[0]
|
||||
assert "### C" in chunks[1]
|
||||
# Chunk 2 begins at a clean section boundary.
|
||||
assert chunks[1].startswith("### C")
|
||||
|
||||
|
||||
# ── State-on-failure (acceptance #4) ────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_failed_sdk_does_not_mark_compiled(tmp_path: Path, monkeypatch) -> None:
|
||||
"""If the SDK raises, compile_daily_log must NOT update state.json.
|
||||
|
||||
Re-running should still see the log as uncompiled.
|
||||
"""
|
||||
import config
|
||||
import utils
|
||||
|
||||
# Redirect state file + paths to tmp.
|
||||
state_file = tmp_path / "state.json"
|
||||
daily_dir = tmp_path / "daily"
|
||||
daily_dir.mkdir()
|
||||
knowledge_dir = tmp_path / "knowledge"
|
||||
knowledge_dir.mkdir()
|
||||
|
||||
monkeypatch.setattr(utils, "STATE_FILE", state_file)
|
||||
|
||||
# Make a tiny fake daily log.
|
||||
log_path = daily_dir / "2026-04-24-test.md"
|
||||
log_path.write_text("### Test\nhello\n")
|
||||
|
||||
# Make AGENTS.md resolvable (compile.py reads it).
|
||||
agents_file = tmp_path / "AGENTS.md"
|
||||
agents_file.write_text("# Test Schema\n")
|
||||
monkeypatch.setattr(config, "AGENTS_FILE", agents_file)
|
||||
monkeypatch.setattr(compile_mod, "AGENTS_FILE", agents_file)
|
||||
|
||||
# Mock _invoke_llm to always fail.
|
||||
async def failing_invoke(_prompt: str) -> tuple[float, bool]:
|
||||
return (0.0, False)
|
||||
|
||||
monkeypatch.setattr(compile_mod, "_invoke_llm", failing_invoke)
|
||||
|
||||
state: dict[str, Any] = {"ingested": {}, "total_cost": 0.0}
|
||||
cost = await compile_mod.compile_daily_log(log_path, state)
|
||||
|
||||
# Function returns 0.0 cost.
|
||||
assert cost == 0.0
|
||||
# State was NOT updated with this log's hash.
|
||||
assert log_path.name not in state.get("ingested", {})
|
||||
# state.json file was NOT written (save_state only runs on full success).
|
||||
assert not state_file.exists()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_partial_chunk_failure_does_not_mark_compiled(tmp_path: Path, monkeypatch) -> None:
|
||||
"""Multi-chunk log: if chunk 2 fails, log must NOT be marked compiled.
|
||||
|
||||
Verifies the `all_succeeded` gate holds across chunks.
|
||||
"""
|
||||
import config
|
||||
import utils
|
||||
|
||||
state_file = tmp_path / "state.json"
|
||||
daily_dir = tmp_path / "daily"
|
||||
daily_dir.mkdir()
|
||||
(tmp_path / "knowledge").mkdir()
|
||||
(tmp_path / "knowledge" / "index.md").write_text("(empty)")
|
||||
|
||||
monkeypatch.setattr(utils, "STATE_FILE", state_file)
|
||||
|
||||
# Log large enough to chunk (>100K) via 3 sections.
|
||||
sections = [f"### Section {i}\n" + "x" * 40_000 + "\n" for i in range(3)]
|
||||
log_path = daily_dir / "2026-04-24-big.md"
|
||||
log_path.write_text("".join(sections))
|
||||
|
||||
agents_file = tmp_path / "AGENTS.md"
|
||||
agents_file.write_text("# Test Schema\n")
|
||||
monkeypatch.setattr(config, "AGENTS_FILE", agents_file)
|
||||
monkeypatch.setattr(compile_mod, "AGENTS_FILE", agents_file)
|
||||
|
||||
# Patch read_wiki_index to return a dummy value (avoid hitting the real
|
||||
# knowledge dir).
|
||||
monkeypatch.setattr(compile_mod, "read_wiki_index", lambda: "(test index)")
|
||||
|
||||
# _invoke_llm: succeed on 1st chunk, fail on 2nd. Track calls.
|
||||
calls = {"n": 0}
|
||||
|
||||
async def flaky_invoke(_prompt: str) -> tuple[float, bool]:
|
||||
calls["n"] += 1
|
||||
if calls["n"] == 1:
|
||||
return (0.25, True) # chunk 1 ok
|
||||
return (0.0, False) # chunk 2 fails
|
||||
|
||||
monkeypatch.setattr(compile_mod, "_invoke_llm", flaky_invoke)
|
||||
|
||||
state: dict[str, Any] = {"ingested": {}, "total_cost": 0.0}
|
||||
cost = await compile_mod.compile_daily_log(log_path, state)
|
||||
|
||||
# Returned partial cost (chunk 1 actually cost something).
|
||||
assert cost == 0.25
|
||||
# But state NOT updated — log is still "uncompiled" from the outside.
|
||||
assert log_path.name not in state.get("ingested", {})
|
||||
assert not state_file.exists()
|
||||
# Exactly 2 chunk attempts (first success, second failure, then bail).
|
||||
assert calls["n"] == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_all_chunks_succeed_updates_state(tmp_path: Path, monkeypatch) -> None:
|
||||
"""When every chunk succeeds, state IS updated with the hash."""
|
||||
import config
|
||||
import utils
|
||||
|
||||
state_file = tmp_path / "state.json"
|
||||
daily_dir = tmp_path / "daily"
|
||||
daily_dir.mkdir()
|
||||
(tmp_path / "knowledge").mkdir()
|
||||
|
||||
monkeypatch.setattr(utils, "STATE_FILE", state_file)
|
||||
|
||||
log_path = daily_dir / "2026-04-24-ok.md"
|
||||
log_path.write_text("### Section\nhi\n")
|
||||
|
||||
agents_file = tmp_path / "AGENTS.md"
|
||||
agents_file.write_text("# Schema\n")
|
||||
monkeypatch.setattr(config, "AGENTS_FILE", agents_file)
|
||||
monkeypatch.setattr(compile_mod, "AGENTS_FILE", agents_file)
|
||||
monkeypatch.setattr(compile_mod, "read_wiki_index", lambda: "(index)")
|
||||
|
||||
async def good_invoke(_prompt: str) -> tuple[float, bool]:
|
||||
return (0.10, True)
|
||||
|
||||
monkeypatch.setattr(compile_mod, "_invoke_llm", good_invoke)
|
||||
|
||||
state: dict[str, Any] = {"ingested": {}, "total_cost": 0.0}
|
||||
cost = await compile_mod.compile_daily_log(log_path, state)
|
||||
|
||||
assert cost == 0.10
|
||||
assert log_path.name in state["ingested"]
|
||||
assert state["ingested"][log_path.name]["cost_usd"] == 0.10
|
||||
assert state["total_cost"] == 0.10
|
||||
# state.json now exists on disk.
|
||||
assert state_file.exists()
|
||||
228
tests/test_fs_utils.py
Normal file
228
tests/test_fs_utils.py
Normal file
|
|
@ -0,0 +1,228 @@
|
|||
"""Tests for scripts/fs_utils.py primitives.
|
||||
|
||||
Covers audit assertions: concurrent flush safety (acceptance #1),
|
||||
crash/corruption recovery (#2, #9), path-traversal rejection (#8),
|
||||
aliased wikilinks (#6 — the parsing side).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from fs_utils import (
|
||||
atomic_write_text,
|
||||
extract_wikilinks,
|
||||
load_json_with_recovery,
|
||||
locked_append_text,
|
||||
parse_wikilink,
|
||||
safe_article_path,
|
||||
)
|
||||
|
||||
|
||||
# ── Atomic writes ───────────────────────────────────────────────────────
|
||||
|
||||
def test_atomic_write_roundtrip(tmp_path: Path) -> None:
|
||||
target = tmp_path / "state.json"
|
||||
atomic_write_text(target, '{"a": 1}')
|
||||
assert target.read_text() == '{"a": 1}'
|
||||
|
||||
|
||||
def test_atomic_write_overwrites(tmp_path: Path) -> None:
|
||||
target = tmp_path / "state.json"
|
||||
target.write_text("old content")
|
||||
atomic_write_text(target, "new content")
|
||||
assert target.read_text() == "new content"
|
||||
|
||||
|
||||
def test_atomic_write_leaves_original_on_exception(tmp_path: Path, monkeypatch) -> None:
|
||||
"""Simulate a write that fails mid-way. Target must retain old content."""
|
||||
target = tmp_path / "state.json"
|
||||
target.write_text("old content")
|
||||
|
||||
# Patch os.replace to raise — simulates a filesystem error at the
|
||||
# critical moment between tmp-write and atomic rename.
|
||||
import fs_utils as fsu
|
||||
|
||||
original_replace = os.replace
|
||||
|
||||
def boom(*args, **kwargs):
|
||||
raise OSError("simulated filesystem failure")
|
||||
|
||||
monkeypatch.setattr(os, "replace", boom)
|
||||
|
||||
with pytest.raises(OSError, match="simulated"):
|
||||
atomic_write_text(target, "new content")
|
||||
|
||||
# Original is intact.
|
||||
assert target.read_text() == "old content"
|
||||
|
||||
# No leftover tmp files.
|
||||
monkeypatch.setattr(os, "replace", original_replace)
|
||||
tmp_leftovers = list(tmp_path.glob(".state.json.*.tmp"))
|
||||
assert tmp_leftovers == []
|
||||
|
||||
|
||||
def test_atomic_write_creates_parent_dirs(tmp_path: Path) -> None:
|
||||
target = tmp_path / "nested" / "deep" / "state.json"
|
||||
atomic_write_text(target, "ok")
|
||||
assert target.read_text() == "ok"
|
||||
|
||||
|
||||
# ── Locked append (acceptance #1) ───────────────────────────────────────
|
||||
|
||||
def _appender_worker(path_str: str, marker: str, count: int) -> None:
|
||||
"""Worker: append `count` well-bounded entries to `path_str`.
|
||||
|
||||
Each entry is a full 'header + body + blank line' unit; if writes
|
||||
interleave, the stream will contain a partial entry.
|
||||
"""
|
||||
from fs_utils import locked_append_text # re-import in worker
|
||||
|
||||
for i in range(count):
|
||||
entry = (
|
||||
f"### {marker} entry {i}\n"
|
||||
f"body line 1 of {marker}-{i}\n"
|
||||
f"body line 2 of {marker}-{i}\n"
|
||||
f"\n"
|
||||
)
|
||||
locked_append_text(Path(path_str), entry)
|
||||
|
||||
|
||||
def test_locked_append_no_interleaving(tmp_path: Path) -> None:
|
||||
"""Run many concurrent appenders; verify no entry is split or interleaved."""
|
||||
target = tmp_path / "daily.md"
|
||||
n_workers = 4
|
||||
entries_per_worker = 25
|
||||
markers = [f"W{i}" for i in range(n_workers)]
|
||||
|
||||
ctx = mp.get_context("fork")
|
||||
procs = [
|
||||
ctx.Process(target=_appender_worker, args=(str(target), m, entries_per_worker))
|
||||
for m in markers
|
||||
]
|
||||
for p in procs:
|
||||
p.start()
|
||||
for p in procs:
|
||||
p.join(timeout=30)
|
||||
assert p.exitcode == 0, f"worker {p} failed"
|
||||
|
||||
content = target.read_text()
|
||||
|
||||
# Every expected entry must appear exactly once and each entry's body
|
||||
# lines must be contiguous (no interleaving).
|
||||
for marker in markers:
|
||||
for i in range(entries_per_worker):
|
||||
header = f"### {marker} entry {i}\n"
|
||||
body1 = f"body line 1 of {marker}-{i}"
|
||||
body2 = f"body line 2 of {marker}-{i}"
|
||||
# Header appears exactly once.
|
||||
assert content.count(header) == 1, f"missing or duplicate header: {header}"
|
||||
# Body line immediately follows header (no interleaving).
|
||||
idx = content.index(header)
|
||||
tail = content[idx + len(header):]
|
||||
assert tail.startswith(body1 + "\n" + body2 + "\n"), (
|
||||
f"entry {marker}-{i} body was interleaved or split"
|
||||
)
|
||||
|
||||
|
||||
# ── JSON recovery (acceptance #9) ───────────────────────────────────────
|
||||
|
||||
def test_json_recovery_roundtrip_clean(tmp_path: Path) -> None:
|
||||
target = tmp_path / "state.json"
|
||||
target.write_text(json.dumps({"a": 1}))
|
||||
assert load_json_with_recovery(target, {}) == {"a": 1}
|
||||
|
||||
|
||||
def test_json_recovery_missing_returns_default(tmp_path: Path) -> None:
|
||||
target = tmp_path / "does-not-exist.json"
|
||||
assert load_json_with_recovery(target, {"fresh": True}) == {"fresh": True}
|
||||
|
||||
|
||||
def test_json_recovery_corruption_creates_backup(tmp_path: Path) -> None:
|
||||
target = tmp_path / "state.json"
|
||||
target.write_text("{ this is not valid json")
|
||||
default = {"ingested": {}}
|
||||
|
||||
result = load_json_with_recovery(target, default)
|
||||
|
||||
# Returned the default.
|
||||
assert result == default
|
||||
# Corrupted file moved aside with .bak-YYYYMMDDTHHMMSSZ suffix.
|
||||
bak_files = list(tmp_path.glob("state.json.bak-*"))
|
||||
assert len(bak_files) == 1, f"expected one .bak file, found {bak_files}"
|
||||
assert re.search(r"bak-\d{8}T\d{6}Z$", bak_files[0].name)
|
||||
# The corrupt content is preserved in the backup.
|
||||
assert bak_files[0].read_text() == "{ this is not valid json"
|
||||
# The main target no longer exists (was moved, not deleted).
|
||||
assert not target.exists()
|
||||
|
||||
|
||||
# ── Wikilink parsing (acceptance #6) ────────────────────────────────────
|
||||
|
||||
def test_extract_wikilinks_bare() -> None:
|
||||
assert extract_wikilinks("see [[concepts/foo]] and [[concepts/bar]]") == [
|
||||
"concepts/foo",
|
||||
"concepts/bar",
|
||||
]
|
||||
|
||||
|
||||
def test_extract_wikilinks_aliased() -> None:
|
||||
assert extract_wikilinks(
|
||||
"ref [[concepts/foo|Foo Display]] and [[concepts/bar|Bar]]"
|
||||
) == ["concepts/foo", "concepts/bar"]
|
||||
|
||||
|
||||
def test_extract_wikilinks_mixed() -> None:
|
||||
assert extract_wikilinks(
|
||||
"[[concepts/a]] and [[concepts/b|B]] and [[concepts/c]]"
|
||||
) == ["concepts/a", "concepts/b", "concepts/c"]
|
||||
|
||||
|
||||
def test_parse_wikilink() -> None:
|
||||
assert parse_wikilink("concepts/foo") == "concepts/foo"
|
||||
assert parse_wikilink("concepts/foo|Display") == "concepts/foo"
|
||||
assert parse_wikilink(" spaces ") == "spaces"
|
||||
|
||||
|
||||
# ── Path traversal (acceptance #8) ──────────────────────────────────────
|
||||
|
||||
def test_safe_path_clean(tmp_path: Path) -> None:
|
||||
base = tmp_path / "kb"
|
||||
base.mkdir()
|
||||
result = safe_article_path("concepts/foo", base)
|
||||
assert result is not None
|
||||
assert result == (base / "concepts" / "foo.md").resolve()
|
||||
|
||||
|
||||
def test_safe_path_traversal_rejected(tmp_path: Path) -> None:
|
||||
base = tmp_path / "kb"
|
||||
base.mkdir()
|
||||
assert safe_article_path("../../etc/passwd", base) is None
|
||||
assert safe_article_path("../outside", base) is None
|
||||
|
||||
|
||||
def test_safe_path_absolute_rejected(tmp_path: Path) -> None:
|
||||
base = tmp_path / "kb"
|
||||
base.mkdir()
|
||||
assert safe_article_path("/etc/passwd", base) is None
|
||||
|
||||
|
||||
def test_safe_path_empty_and_invalid(tmp_path: Path) -> None:
|
||||
base = tmp_path / "kb"
|
||||
base.mkdir()
|
||||
assert safe_article_path("", base) is None
|
||||
assert safe_article_path("foo\0bar", base) is None # null byte
|
||||
|
||||
|
||||
def test_safe_path_strips_alias(tmp_path: Path) -> None:
|
||||
base = tmp_path / "kb"
|
||||
base.mkdir()
|
||||
result = safe_article_path("concepts/foo|Display", base)
|
||||
assert result == (base / "concepts" / "foo.md").resolve()
|
||||
115
tests/test_lint_backlinks.py
Normal file
115
tests/test_lint_backlinks.py
Normal file
|
|
@ -0,0 +1,115 @@
|
|||
"""Tests for lint.py backlink + wikilink behavior.
|
||||
|
||||
Covers audit assertion #6 (aliased wikilinks pass lint cleanly) and
|
||||
the C9 fix (qa/sources don't generate backlink noise on concepts).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def kb_factory(tmp_path: Path, monkeypatch):
|
||||
"""Return a function that sets up a mini knowledge base under tmp_path.
|
||||
|
||||
Yields the knowledge dir; all paths are monkeypatched so lint.py reads
|
||||
from the temp KB.
|
||||
"""
|
||||
kb = tmp_path / "knowledge"
|
||||
kb.mkdir()
|
||||
(kb / "concepts").mkdir()
|
||||
(kb / "connections").mkdir()
|
||||
(kb / "qa").mkdir()
|
||||
|
||||
# Monkeypatch both lint.py's view and utils.py's view (lint uses utils).
|
||||
import config
|
||||
import utils
|
||||
import lint as lint_mod
|
||||
|
||||
monkeypatch.setattr(config, "KNOWLEDGE_DIR", kb)
|
||||
monkeypatch.setattr(config, "CONCEPTS_DIR", kb / "concepts")
|
||||
monkeypatch.setattr(config, "CONNECTIONS_DIR", kb / "connections")
|
||||
monkeypatch.setattr(config, "QA_DIR", kb / "qa")
|
||||
monkeypatch.setattr(utils, "KNOWLEDGE_DIR", kb)
|
||||
monkeypatch.setattr(utils, "CONCEPTS_DIR", kb / "concepts")
|
||||
monkeypatch.setattr(utils, "CONNECTIONS_DIR", kb / "connections")
|
||||
monkeypatch.setattr(utils, "QA_DIR", kb / "qa")
|
||||
monkeypatch.setattr(lint_mod, "KNOWLEDGE_DIR", kb)
|
||||
|
||||
return kb
|
||||
|
||||
|
||||
def test_aliased_wikilinks_are_not_broken(kb_factory: Path) -> None:
|
||||
"""[[concepts/foo|Display]] pointing to an existing article = not broken."""
|
||||
from lint import check_broken_links
|
||||
|
||||
(kb_factory / "concepts" / "foo.md").write_text(
|
||||
"---\ntitle: foo\n---\nLinks to [[concepts/bar]].\n"
|
||||
)
|
||||
(kb_factory / "concepts" / "bar.md").write_text(
|
||||
"---\ntitle: bar\n---\nRefers back to [[concepts/foo|The Foo Article]].\n"
|
||||
)
|
||||
|
||||
issues = check_broken_links()
|
||||
assert issues == [], f"unexpected broken-link issues: {issues}"
|
||||
|
||||
|
||||
def test_aliased_backlink_counts_as_inbound(kb_factory: Path) -> None:
|
||||
"""An aliased link to A counts as a valid inbound link on A."""
|
||||
from lint import check_missing_backlinks
|
||||
|
||||
(kb_factory / "concepts" / "foo.md").write_text(
|
||||
"---\ntitle: foo\n---\nLinks to [[concepts/bar]].\n"
|
||||
)
|
||||
(kb_factory / "concepts" / "bar.md").write_text(
|
||||
"---\ntitle: bar\n---\nAliased backlink: [[concepts/foo|Foo Display]].\n"
|
||||
)
|
||||
|
||||
issues = check_missing_backlinks()
|
||||
# foo links to bar, bar links (aliased) to foo → symmetric → zero.
|
||||
assert issues == [], f"unexpected backlink issues: {issues}"
|
||||
|
||||
|
||||
def test_qa_sources_dont_trigger_backlink_suggestions(kb_factory: Path) -> None:
|
||||
"""QA articles referencing concepts should NOT demand a reciprocal link."""
|
||||
from lint import check_missing_backlinks
|
||||
|
||||
(kb_factory / "concepts" / "foo.md").write_text(
|
||||
"---\ntitle: foo\n---\nJust content.\n"
|
||||
)
|
||||
# Q&A article cites foo; should not produce a backlink suggestion.
|
||||
(kb_factory / "qa" / "how-do-i-use-foo.md").write_text(
|
||||
"---\ntitle: how-do-i-use-foo\n---\nConsulted [[concepts/foo]].\n"
|
||||
)
|
||||
|
||||
issues = check_missing_backlinks()
|
||||
# The QA source is skipped → no missing-backlink suggestion about it.
|
||||
foo_to_qa_suggestions = [
|
||||
i for i in issues
|
||||
if "concepts/foo" in i["detail"] and "qa/how-do-i-use-foo" in i["detail"]
|
||||
]
|
||||
assert foo_to_qa_suggestions == [], (
|
||||
f"qa article should not trigger backlink suggestion: {foo_to_qa_suggestions}"
|
||||
)
|
||||
|
||||
|
||||
def test_concept_to_concept_backlinks_still_checked(kb_factory: Path) -> None:
|
||||
"""The C9 skip is narrow — concept-to-concept asymmetry still reported."""
|
||||
from lint import check_missing_backlinks
|
||||
|
||||
# foo links to bar; bar does NOT link back.
|
||||
(kb_factory / "concepts" / "foo.md").write_text(
|
||||
"---\ntitle: foo\n---\nLinks to [[concepts/bar]].\n"
|
||||
)
|
||||
(kb_factory / "concepts" / "bar.md").write_text(
|
||||
"---\ntitle: bar\n---\nNo links.\n"
|
||||
)
|
||||
|
||||
issues = check_missing_backlinks()
|
||||
assert any(
|
||||
"concepts/foo" in i["detail"] and "concepts/bar" in i["detail"]
|
||||
for i in issues
|
||||
), f"expected concept-to-concept asymmetry report; got {issues}"
|
||||
Loading…
Add table
Add a link
Reference in a new issue