Skip to content

Storage API

Ingest Store

ingest_manifest_db

Law ingest hub orchestration boundary for ingest manifest persistence.

IngestManifestDbStore owns run policy, batching/deduping behavior, lock scope, and unit-of-work coordination across repository primitives.

Classes:

Name Description
IngestManifestDbStore

Application-facing orchestration boundary for ingest persistence.

IngestManifestDbStore dataclass

IngestManifestDbStore(path: Path)

Application-facing orchestration boundary for ingest persistence.

Methods:

Name Description
close

Dispose underlying DB resources eagerly.

commit_act_side_effects

Commit side-effect rowsets in one transaction boundary.

commit_parse_side_effects

Commit parse/checkpoint side effects from runner intent payloads.

finalize_act_build

Finalize one canonical act build in a single unit of work.

finish_run

Mark a previously started run as finished with final status.

load_builder_state_v2

Load compact incremental builder state restart checkpoint.

load_latest_missing_snapshot

Load the most recent missing-interval snapshot metadata and rows.

load_version_states

Return nested map: owner_alias_value -> version_tag -> source hashes.

persist_finalized_build

Persist one finalized canonical build from runner intent payloads.

record_progress_batch

Record act-progress checkpoints with store-owned dedupe policy.

record_variant_events_batch

Record variant-event checkpoints with store-owned dedupe policy.

record_version_states_batch

Record version-state upserts with store-owned dedupe/ID resolution.

save_builder_state_v2

Persist compact incremental builder state for restart checkpoints.

save_missing_snapshot

Persist a missing-interval snapshot and return its meta_id.

start_run

Create and persist an ingest run row, returning its run id.

validate_contract

Validate canonical ingest schema invariants.

close

close() -> None

Dispose underlying DB resources eagerly.

Source code in src/law_graph/storage/ingest_manifest_db.py
def close(self) -> None:
    """Dispose underlying DB resources eagerly."""

    self._repository.close()

commit_act_side_effects

commit_act_side_effects(
    *,
    source: str,
    owner_alias_kind: str,
    run_id: str,
    progress_rows: list[tuple[str, IngestActProgressRowV2]]
    | None = None,
    variant_event_rows: list[
        tuple[str, IngestVariantEventRowV2]
    ]
    | None = None,
    version_state_rows: list[
        tuple[str, str, dict[str, str], str]
    ]
    | None = None,
) -> None

Commit side-effect rowsets in one transaction boundary.

Source code in src/law_graph/storage/ingest_manifest_db.py
def commit_act_side_effects(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    run_id: str,
    progress_rows: list[tuple[str, IngestActProgressRowV2]] | None = None,
    variant_event_rows: list[tuple[str, IngestVariantEventRowV2]] | None = None,
    version_state_rows: list[tuple[str, str, dict[str, str], str]] | None = None,
) -> None:
    """Commit side-effect rowsets in one transaction boundary."""

    if not progress_rows and not variant_event_rows and not version_state_rows:
        return
    with self._lock:
        with Session(self._repository.engine) as session:
            if variant_event_rows:
                self.record_variant_events_batch(
                    source=source,
                    owner_alias_kind=owner_alias_kind,
                    run_id=run_id,
                    rows=variant_event_rows,
                    session=session,
                )
            if version_state_rows:
                self.record_version_states_batch(
                    source=source,
                    owner_alias_kind=owner_alias_kind,
                    rows=version_state_rows,
                    session=session,
                )
            if progress_rows:
                self.record_progress_batch(
                    source=source,
                    owner_alias_kind=owner_alias_kind,
                    run_id=run_id,
                    rows=progress_rows,
                    session=session,
                )
            session.commit()

commit_parse_side_effects

commit_parse_side_effects(
    *,
    source: str,
    owner_alias_kind: str,
    run_id: str,
    progress_checkpoints: list[
        IngestProgressCheckpointInput
    ]
    | None = None,
    variant_event_checkpoints: list[
        IngestVariantEventCheckpointInput
    ]
    | None = None,
    version_state_checkpoints: list[
        IngestVersionStateCheckpointInput
    ]
    | None = None,
) -> None

Commit parse/checkpoint side effects from runner intent payloads.

Source code in src/law_graph/storage/ingest_manifest_db.py
def commit_parse_side_effects(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    run_id: str,
    progress_checkpoints: list[IngestProgressCheckpointInput] | None = None,
    variant_event_checkpoints: list[IngestVariantEventCheckpointInput] | None = None,
    version_state_checkpoints: list[IngestVersionStateCheckpointInput] | None = None,
) -> None:
    """Commit parse/checkpoint side effects from runner intent payloads."""

    progress_rows = (
        [
            (
                checkpoint.owner_alias_value,
                IngestActProgressRowV2(
                    phase=checkpoint.phase,
                    last_variant_id=checkpoint.last_variant_id,
                    variants_total=checkpoint.variants_total,
                    variants_done=checkpoint.variants_done,
                    status=checkpoint.status,
                ),
            )
            for checkpoint in progress_checkpoints
        ]
        if progress_checkpoints
        else None
    )
    variant_rows = (
        [
            (
                checkpoint.owner_alias_value,
                IngestVariantEventRowV2(
                    variant_id=checkpoint.variant_id,
                    version_tag=checkpoint.version_tag,
                    parse_status=checkpoint.parse_status,
                    variant_fingerprint=checkpoint.variant_fingerprint,
                    skip_reason=checkpoint.skip_reason,
                    metrics=checkpoint.metrics,
                ),
            )
            for checkpoint in variant_event_checkpoints
        ]
        if variant_event_checkpoints
        else None
    )
    version_rows = (
        [
            (
                checkpoint.owner_alias_value,
                checkpoint.version_tag,
                dict(checkpoint.source_hashes),
                run_id,
            )
            for checkpoint in version_state_checkpoints
        ]
        if version_state_checkpoints
        else None
    )
    self.commit_act_side_effects(
        source=source,
        owner_alias_kind=owner_alias_kind,
        run_id=run_id,
        progress_rows=progress_rows,
        variant_event_rows=variant_rows,
        version_state_rows=version_rows,
    )

finalize_act_build

finalize_act_build(
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    run_id: str,
    urn_nir: str,
    version_tag: str,
    input_versions: list[str],
    interval_node_count: int,
    content_blob_count: int,
    original_payloads: dict[str, Any] | None,
    variant_diagnostic: IngestBuildVariantDiagnosticRow
    | None = None,
    node_intervals: list[IngestNodeIntervalRow],
    content_blobs: list[IngestContentBlobRow],
    metadata_deltas: list[IngestMetadataDeltaRow],
) -> int

Finalize one canonical act build in a single unit of work.

Source code in src/law_graph/storage/ingest_manifest_db.py
def finalize_act_build(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    run_id: str,
    urn_nir: str,
    version_tag: str,
    input_versions: list[str],
    interval_node_count: int,
    content_blob_count: int,
    original_payloads: dict[str, Any] | None,
    variant_diagnostic: IngestBuildVariantDiagnosticRow | None = None,
    node_intervals: list[IngestNodeIntervalRow],
    content_blobs: list[IngestContentBlobRow],
    metadata_deltas: list[IngestMetadataDeltaRow],
) -> int:
    """Finalize one canonical act build in a single unit of work."""

    _ = content_blob_count

    referenced_content_version_ids = {
        row.content_version_id for row in node_intervals if row.content_version_id is not None
    }
    filtered_content_blobs = [
        row for row in content_blobs if row.content_version_id in referenced_content_version_ids
    ]

    with self._lock:
        with Session(self._repository.engine) as session:
            act_id = self._resolve_act_id(urn_nir=urn_nir, run_id=run_id, session=session)
            source_ref_id = self._resolve_source_ref_id(
                source=source,
                owner_alias_kind=owner_alias_kind,
                owner_alias_value=owner_alias_value,
                run_id=run_id,
                session=session,
            )
            self._repository.map_source_to_act(
                source_ref_id=source_ref_id,
                act_id=act_id,
                run_id=run_id,
                session=session,
            )
            build_id = self._repository.upsert_act_build_raw(
                act_id=act_id,
                version_tag=version_tag,
                run_id=run_id,
                input_versions_json=json.dumps(input_versions, ensure_ascii=False),
                interval_node_count=interval_node_count,
                content_blob_count=len(filtered_content_blobs),
                original_payloads_json=(
                    json.dumps(original_payloads, ensure_ascii=False) if original_payloads is not None else None
                ),
                session=session,
            )
            self._repository.replace_act_build_rows_raw(
                build_id=build_id,
                urn_nir=urn_nir,
                variant_diagnostic=variant_diagnostic,
                node_intervals=node_intervals,
                content_blobs=filtered_content_blobs,
                metadata_deltas=metadata_deltas,
                session=session,
            )
            session.commit()
            return build_id

finish_run

finish_run(
    *,
    run_id: str,
    status: str,
    skip_stats: dict[str, int] | None = None,
) -> None

Mark a previously started run as finished with final status.

Source code in src/law_graph/storage/ingest_manifest_db.py
def finish_run(self, *, run_id: str, status: str, skip_stats: dict[str, int] | None = None) -> None:
    """Mark a previously started run as finished with final status."""

    with self._lock:
        self._repository.finish_run(run_id=run_id, status=status, skip_stats=skip_stats)
        self._source_ref_cache_by_run.pop(run_id, None)
        self._act_id_cache_by_run.pop(run_id, None)

load_builder_state_v2

load_builder_state_v2(
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    run_id: str,
) -> dict[str, Any] | None

Load compact incremental builder state restart checkpoint.

Source code in src/law_graph/storage/ingest_manifest_db.py
def load_builder_state_v2(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    run_id: str,
) -> dict[str, Any] | None:
    """Load compact incremental builder state restart checkpoint."""

    with self._lock:
        return self._repository.load_builder_state_v2(
            source=source,
            owner_alias_kind=owner_alias_kind,
            owner_alias_value=owner_alias_value,
            run_id=run_id,
        )

load_latest_missing_snapshot

load_latest_missing_snapshot() -> tuple[
    MissingSnapshotMetadata | None,
    dict[
        SourceOwnerAliasKey, list[IngestMissingIntervalRow]
    ],
]

Load the most recent missing-interval snapshot metadata and rows.

Source code in src/law_graph/storage/ingest_manifest_db.py
def load_latest_missing_snapshot(
    self,
) -> tuple[MissingSnapshotMetadata | None, dict[SourceOwnerAliasKey, list[IngestMissingIntervalRow]]]:
    """Load the most recent missing-interval snapshot metadata and rows."""

    with self._lock:
        meta_row, grouped_rows = self._repository.fetch_latest_missing_snapshot()
    if meta_row is None:
        return None, {}

    metadata = MissingSnapshotMetadata(
        download_manifest=MissingSnapshotDigest(
            path=meta_row.download_manifest_path,
            sha256=meta_row.download_manifest_sha256,
        ),
        ingest_manifest=(
            MissingSnapshotDigest(
                path=meta_row.ingest_manifest_path,
                sha256=meta_row.ingest_manifest_sha256,
            )
            if meta_row.ingest_manifest_path
            else None
        ),
        acts_total=meta_row.acts_total,
        acts_with_missing=meta_row.acts_with_missing,
        missing_intervals_total=meta_row.missing_intervals_total,
        acts_expected=meta_row.acts_expected,
        complete=bool(meta_row.complete),
        schema_version=meta_row.schema_version,
        generated_at=meta_row.generated_at,
    )
    grouped: dict[SourceOwnerAliasKey, list[IngestMissingIntervalRow]] = {}
    for (source, owner_alias_kind, owner_alias_value), rows in grouped_rows.items():
        grouped[
            SourceOwnerAliasKey(
                source=source,
                owner_alias_kind=owner_alias_kind,
                owner_alias_value=owner_alias_value,
            )
        ] = rows
    return metadata, grouped

load_version_states

load_version_states(
    *, source: str, owner_alias_kind: str
) -> dict[str, dict[str, dict[str, str]]]

Return nested map: owner_alias_value -> version_tag -> source hashes.

Source code in src/law_graph/storage/ingest_manifest_db.py
def load_version_states(
    self,
    *,
    source: str,
    owner_alias_kind: str,
) -> dict[str, dict[str, dict[str, str]]]:
    """Return nested map: owner_alias_value -> version_tag -> source hashes."""

    with self._lock:
        return self._repository.fetch_version_states(
            source=source,
            owner_alias_kind=owner_alias_kind,
        )

persist_finalized_build

persist_finalized_build(
    *,
    source: str,
    owner_alias_kind: str,
    run_id: str,
    payload: IngestFinalizedBuildPayloadInput,
) -> int

Persist one finalized canonical build from runner intent payloads.

Source code in src/law_graph/storage/ingest_manifest_db.py
def persist_finalized_build(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    run_id: str,
    payload: IngestFinalizedBuildPayloadInput,
) -> int:
    """Persist one finalized canonical build from runner intent payloads."""

    return self.finalize_act_build(
        source=source,
        owner_alias_kind=owner_alias_kind,
        owner_alias_value=payload.owner_alias_value,
        run_id=run_id,
        urn_nir=payload.urn_nir,
        version_tag=payload.version_tag,
        input_versions=list(payload.input_versions),
        interval_node_count=payload.interval_node_count,
        content_blob_count=payload.content_blob_count,
        original_payloads=payload.original_payloads,
        variant_diagnostic=self._coerce_variant_diagnostic(payload.variant_diagnostic),
        node_intervals=self._coerce_node_interval_rows(payload.node_intervals),
        content_blobs=self._coerce_content_blob_rows(payload.content_blobs),
        metadata_deltas=self._coerce_metadata_delta_rows(payload.metadata_deltas),
    )

record_progress_batch

record_progress_batch(
    *,
    source: str,
    owner_alias_kind: str,
    run_id: str,
    rows: list[tuple[str, IngestActProgressRowV2]],
    session: Session | None = None,
) -> None

Record act-progress checkpoints with store-owned dedupe policy.

Source code in src/law_graph/storage/ingest_manifest_db.py
def record_progress_batch(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    run_id: str,
    rows: list[tuple[str, IngestActProgressRowV2]],
    session: Session | None = None,
) -> None:
    """Record act-progress checkpoints with store-owned dedupe policy."""

    if session is None:
        with self._lock:
            with Session(self._repository.engine) as active_session:
                self.record_progress_batch(
                    source=source,
                    owner_alias_kind=owner_alias_kind,
                    run_id=run_id,
                    rows=rows,
                    session=active_session,
                )
                active_session.commit()
        return
    if not rows:
        return
    deduped_rows = self._dedupe_progress_rows(rows)
    now = self._repository.now_iso()
    payload: list[dict[str, Any]] = []
    for owner_alias_value, row in deduped_rows:
        source_ref_id = self._resolve_source_ref_id(
            source=source,
            owner_alias_kind=owner_alias_kind,
            owner_alias_value=owner_alias_value,
            run_id=run_id,
            session=session,
        )
        payload.append(
            {
                "run_id": run_id,
                "source_ref_id": source_ref_id,
                "phase": row.phase,
                "last_variant_id": row.last_variant_id,
                "variants_total": row.variants_total,
                "variants_done": row.variants_done,
                "status": row.status,
                "started_at": now,
                "updated_at": now,
            }
        )
    self._repository.upsert_act_progress_rows_bulk_raw(rows=payload, session=session)

record_variant_events_batch

record_variant_events_batch(
    *,
    source: str,
    owner_alias_kind: str,
    run_id: str,
    rows: list[tuple[str, IngestVariantEventRowV2]],
    session: Session | None = None,
) -> None

Record variant-event checkpoints with store-owned dedupe policy.

Source code in src/law_graph/storage/ingest_manifest_db.py
def record_variant_events_batch(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    run_id: str,
    rows: list[tuple[str, IngestVariantEventRowV2]],
    session: Session | None = None,
) -> None:
    """Record variant-event checkpoints with store-owned dedupe policy."""

    if session is None:
        with self._lock:
            with Session(self._repository.engine) as active_session:
                self.record_variant_events_batch(
                    source=source,
                    owner_alias_kind=owner_alias_kind,
                    run_id=run_id,
                    rows=rows,
                    session=active_session,
                )
                active_session.commit()
        return
    if not rows:
        return
    deduped_rows = self._dedupe_variant_rows(rows)
    created_at = self._repository.now_iso()
    payload: list[dict[str, Any]] = []
    for owner_alias_value, row in deduped_rows:
        source_ref_id = self._resolve_source_ref_id(
            source=source,
            owner_alias_kind=owner_alias_kind,
            owner_alias_value=owner_alias_value,
            run_id=run_id,
            session=session,
        )
        payload.append(
            {
                "run_id": run_id,
                "source_ref_id": source_ref_id,
                "variant_id": row.variant_id,
                "version_tag": row.version_tag,
                "parse_status": row.parse_status,
                "variant_fingerprint": row.variant_fingerprint,
                "skip_reason": row.skip_reason,
                "metrics_json": json.dumps(row.metrics, ensure_ascii=False) if row.metrics is not None else None,
                "created_at": created_at,
            }
        )
    self._repository.upsert_variant_event_rows_bulk_raw(rows=payload, session=session)

record_version_states_batch

record_version_states_batch(
    *,
    source: str,
    owner_alias_kind: str,
    rows: list[tuple[str, str, dict[str, str], str]],
    session: Session | None = None,
) -> None

Record version-state upserts with store-owned dedupe/ID resolution.

Source code in src/law_graph/storage/ingest_manifest_db.py
def record_version_states_batch(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    rows: list[tuple[str, str, dict[str, str], str]],
    session: Session | None = None,
) -> None:
    """Record version-state upserts with store-owned dedupe/ID resolution."""

    if session is None:
        with self._lock:
            with Session(self._repository.engine) as active_session:
                self.record_version_states_batch(
                    source=source,
                    owner_alias_kind=owner_alias_kind,
                    rows=rows,
                    session=active_session,
                )
                active_session.commit()
        return
    if not rows:
        return
    deduped_rows = self._dedupe_version_rows(rows)
    built_at = self._repository.now_iso()
    payload: list[dict[str, Any]] = []
    for owner_alias_value, version_tag, source_hashes, run_id in deduped_rows:
        source_ref_id = self._resolve_source_ref_id(
            source=source,
            owner_alias_kind=owner_alias_kind,
            owner_alias_value=owner_alias_value,
            run_id=run_id,
            session=session,
        )
        payload.append(
            {
                "source_ref_id": source_ref_id,
                "version_tag": version_tag,
                "hash_json": source_hashes.get("json"),
                "hash_akn": source_hashes.get("akn"),
                "hash_nir": source_hashes.get("nir"),
                "built_at": built_at,
                "last_run_id": run_id,
            }
        )
    self._repository.upsert_version_states_bulk_raw(rows=payload, session=session)

save_builder_state_v2

save_builder_state_v2(
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    run_id: str,
    state: dict[str, Any],
) -> None

Persist compact incremental builder state for restart checkpoints.

Source code in src/law_graph/storage/ingest_manifest_db.py
def save_builder_state_v2(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    run_id: str,
    state: dict[str, Any],
) -> None:
    """Persist compact incremental builder state for restart checkpoints."""

    with self._lock:
        source_ref_id = self._resolve_source_ref_id(
            source=source,
            owner_alias_kind=owner_alias_kind,
            owner_alias_value=owner_alias_value,
            run_id=run_id,
        )
        self._repository.upsert_builder_state_raw(
            run_id=run_id,
            source_ref_id=source_ref_id,
            state_json=json.dumps(state, ensure_ascii=False),
            updated_at=self._repository.now_iso(),
        )

save_missing_snapshot

save_missing_snapshot(
    *,
    run_id: str,
    source: str,
    owner_alias_kind: str,
    metadata: MissingSnapshotMetadata,
    hints_by_source_owner_alias: dict[
        SourceOwnerAliasKey,
        list[IngestMissingIntervalRow | dict[str, Any]],
    ],
) -> int

Persist a missing-interval snapshot and return its meta_id.

Source code in src/law_graph/storage/ingest_manifest_db.py
def save_missing_snapshot(
    self,
    *,
    run_id: str,
    source: str,
    owner_alias_kind: str,
    metadata: MissingSnapshotMetadata,
    hints_by_source_owner_alias: dict[
        SourceOwnerAliasKey,
        list[IngestMissingIntervalRow | dict[str, Any]],
    ],
) -> int:
    """Persist a missing-interval snapshot and return its ``meta_id``."""

    with self._lock:
        with Session(self._repository.engine) as session:
            meta_id = self._repository.insert_missing_snapshot_meta_raw(
                run_id=run_id,
                generated_at=metadata.generated_at,
                acts_total=metadata.acts_total,
                acts_with_missing=metadata.acts_with_missing,
                missing_intervals_total=metadata.missing_intervals_total,
                acts_expected=metadata.acts_expected,
                complete=metadata.complete,
                download_manifest_path=metadata.download_manifest.path,
                download_manifest_sha256=metadata.download_manifest.sha256,
                ingest_manifest_path=metadata.ingest_manifest.path if metadata.ingest_manifest else None,
                ingest_manifest_sha256=metadata.ingest_manifest.sha256 if metadata.ingest_manifest else None,
                schema_version=metadata.schema_version,
                session=session,
            )
            payload: list[dict[str, Any]] = []
            for key, raw_intervals in hints_by_source_owner_alias.items():
                if key.source != source:
                    raise ValueError(f"save_missing_snapshot source mismatch: expected={source} got={key.source}")
                if key.owner_alias_kind != owner_alias_kind:
                    raise ValueError(
                        "save_missing_snapshot owner_alias_kind mismatch: "
                        f"expected={owner_alias_kind} got={key.owner_alias_kind}"
                    )
                source_ref_id = self._resolve_source_ref_id(
                    source=source,
                    owner_alias_kind=owner_alias_kind,
                    owner_alias_value=key.owner_alias_value,
                    run_id=run_id,
                    session=session,
                )
                source_ref = session.get(IngestSourceRef, source_ref_id)
                act_id = (
                    int(source_ref.act_id) if source_ref is not None and source_ref.act_id is not None else None
                )

                for raw_interval in raw_intervals:
                    interval = self._coerce_missing_interval_row(raw_interval)
                    payload.append(
                        {
                            "meta_id": meta_id,
                            "source_ref_id": source_ref_id,
                            "version_tag": None,
                            "act_id": act_id,
                            "urn_nir": interval.urn_nir or None,
                            "node_type": interval.node_type,
                            "node_id": interval.node_id,
                            "start_date": interval.start_date,
                            "end_date": interval.end_date,
                            "reason": interval.reason,
                        }
                    )
            self._repository.insert_missing_interval_rows_bulk_raw(rows=payload, session=session)
            session.commit()
            return meta_id

start_run

start_run(
    *,
    params: dict[str, Any],
    download_manifest_path: Path,
    download_manifest_sha256: str | None,
) -> str

Create and persist an ingest run row, returning its run id.

Source code in src/law_graph/storage/ingest_manifest_db.py
def start_run(
    self,
    *,
    params: dict[str, Any],
    download_manifest_path: Path,
    download_manifest_sha256: str | None,
) -> str:
    """Create and persist an ingest run row, returning its run id."""

    with self._lock:
        return self._repository.start_run(
            params=params,
            download_manifest_path=download_manifest_path,
            download_manifest_sha256=download_manifest_sha256,
        )

validate_contract

validate_contract() -> None

Validate canonical ingest schema invariants.

Raises:

Type Description
RuntimeError

If schema version table is missing or stale.

Source code in src/law_graph/storage/ingest_manifest_db.py
def validate_contract(self) -> None:
    """Validate canonical ingest schema invariants.

    Raises
    ------
    RuntimeError
        If schema version table is missing or stale.
    """

    with self._lock:
        with self._repository.engine.connect() as conn:
            # Raw SQL retained for lightweight invariant probes used by CI
            # smoke checks and migration verification.
            row = conn.exec_driver_sql(
                "SELECT value FROM ig_schema_meta WHERE key = 'schema_version' LIMIT 1"
            ).scalar_one_or_none()
            if row is None:
                raise RuntimeError("ingest schema version missing")
            if str(row) != INGEST_SCHEMA_VERSION:
                raise RuntimeError(f"ingest schema version mismatch: expected={INGEST_SCHEMA_VERSION} got={row}")
            has_interval_row_id = self._table_has_column(
                conn,
                table_name="ig_missing_interval",
                column_name="interval_row_id",
            )
            if not has_interval_row_id:
                raise RuntimeError("ingest schema invariant violation: ig_missing_interval.interval_row_id missing")
            has_source_ref_id = self._table_has_column(
                conn,
                table_name="ig_version_state",
                column_name="source_ref_id",
            )
            if not has_source_ref_id:
                raise RuntimeError("ingest schema invariant violation: ig_version_state.source_ref_id missing")
            duplicate_build_keys = int(
                conn.exec_driver_sql(
                    """
                    SELECT COUNT(*)
                    FROM (
                      SELECT act_id, version_tag
                      FROM ig_act_build
                      GROUP BY act_id, version_tag
                      HAVING COUNT(*) > 1
                    )
                    """
                ).scalar_one()
            )
            if duplicate_build_keys != 0:
                raise RuntimeError("ingest schema invariant violation: duplicate act_id/version_tag builds")

Ingest Repository

ingest_repository

SQL primitive repository for ingest manifest DB operations.

This module owns table-oriented persistence primitives only. Application-level orchestration policy (locking, batching/deduping strategy, retries, and unit of work decisions across multiple primitives) belongs in IngestManifestDbStore.

Classes:

Name Description
IngestRepository

Repository for ingest manifest state and canonical ingest rows.

IngestRepository

IngestRepository(db_path: Path)

Repository for ingest manifest state and canonical ingest rows.

Methods:

Name Description
close

Dispose pooled DB connections eagerly.

ensure_act

Resolve or create canonical act hub row.

ensure_source_ref

Resolve or create a source publication reference row.

fetch_latest_missing_snapshot

Fetch latest snapshot metadata and grouped interval rows.

fetch_version_states

Fetch nested owner_alias_value -> version_tag -> source_hashes map.

find_source_ref_id

Resolve a source publication reference id without creating rows.

finish_run

Update run status and finished timestamp.

insert_missing_interval_rows_bulk_raw

Insert normalized ig_missing_interval rows.

insert_missing_snapshot_meta_raw

Insert one ig_missing_interval_meta row and return meta_id.

load_builder_state_v2

Load compact incremental builder state spill row.

map_source_to_act

Upsert resolved source->act mapping row.

now_iso

Return current UTC timestamp in ISO-8601.

replace_act_build_rows_raw

Replace build-dependent rowsets for one canonical build id.

start_run

Insert ingest run row and return run id.

upsert_act_build_raw

Upsert normalized ig_act_build row and return build_id.

upsert_act_progress_rows_bulk_raw

Upsert normalized ig_act_progress_v2 rows keyed by IDs.

upsert_builder_state_raw

Upsert one normalized ig_builder_state_v2 row.

upsert_variant_event_rows_bulk_raw

Upsert normalized ig_variant_event_v2 rows keyed by IDs.

upsert_version_states_bulk_raw

Upsert normalized ig_version_state rows keyed by resolved IDs.

Attributes:

Name Type Description
engine Engine

Expose SQLModel engine for low-level checks/migrations.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def __init__(self, db_path: Path) -> None:
    self._engine: Engine = create_engine(
        f"sqlite:///{db_path}",
        connect_args={"check_same_thread": False},
    )
    event.listen(self._engine, "connect", self._enable_sqlite_foreign_keys)
    with self._engine.connect() as conn:
        conn.exec_driver_sql("PRAGMA journal_mode=WAL")
        conn.exec_driver_sql("PRAGMA synchronous=FULL")
        conn.exec_driver_sql("PRAGMA cache_size=-64000")  # 64 MB
        conn.exec_driver_sql("PRAGMA mmap_size=268435456")  # 256 MB
        conn.commit()
    SQLModel.metadata.create_all(
        self._engine,
        tables=[
            IngestRun.__table__,
            IngestAct.__table__,
            IngestSourceRef.__table__,
            IngestVersionState.__table__,
            IngestActBuild.__table__,
            IngestBuildVariantDiagnostic.__table__,
            IngestActProgressV2.__table__,
            IngestVariantEventV2.__table__,
            IngestBuilderStateV2.__table__,
            IngestNodeInterval.__table__,
            IngestContentBlob.__table__,
            IngestMetadataDelta.__table__,
            IngestMissingIntervalMeta.__table__,
            IngestMissingInterval.__table__,
        ],
    )

engine property

engine: Engine

Expose SQLModel engine for low-level checks/migrations.

close

close() -> None

Dispose pooled DB connections eagerly.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def close(self) -> None:
    """Dispose pooled DB connections eagerly."""

    self._engine.dispose()

ensure_act

ensure_act(
    *, urn_nir: str, session: Session | None = None
) -> int

Resolve or create canonical act hub row.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def ensure_act(self, *, urn_nir: str, session: Session | None = None) -> int:
    """Resolve or create canonical act hub row."""

    cleaned_urn = self._require_non_empty(urn_nir, field_name="urn_nir")
    with self._write_session(session=session) as active_session:
        row = active_session.exec(select(IngestAct).where(IngestAct.urn_nir == cleaned_urn)).first()
        if row is not None:
            return self._require_persisted_id(row.act_id, field_name="act_id")
        row = IngestAct(urn_nir=cleaned_urn)
        active_session.add(row)
        active_session.flush()
        return self._require_persisted_id(row.act_id, field_name="act_id")

ensure_source_ref

ensure_source_ref(
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    session: Session | None = None,
) -> int

Resolve or create a source publication reference row.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def ensure_source_ref(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    session: Session | None = None,
) -> int:
    """Resolve or create a source publication reference row."""

    cleaned_source = self._require_non_empty(source, field_name="source")
    cleaned_owner_alias_kind = self._require_non_empty(owner_alias_kind, field_name="owner_alias_kind")
    cleaned_owner_alias_value = self._require_non_empty(owner_alias_value, field_name="owner_alias_value")
    with self._write_session(session=session) as active_session:
        row = active_session.exec(
            select(IngestSourceRef)
            .where(IngestSourceRef.source == cleaned_source)
            .where(IngestSourceRef.owner_alias_kind == cleaned_owner_alias_kind)
            .where(IngestSourceRef.owner_alias_value == cleaned_owner_alias_value)
        ).first()
        if row is not None:
            return self._require_persisted_id(row.source_ref_id, field_name="source_ref_id")
        row = IngestSourceRef(
            source=cleaned_source,
            owner_alias_kind=cleaned_owner_alias_kind,
            owner_alias_value=cleaned_owner_alias_value,
        )
        active_session.add(row)
        active_session.flush()
        return self._require_persisted_id(row.source_ref_id, field_name="source_ref_id")

fetch_latest_missing_snapshot

fetch_latest_missing_snapshot() -> tuple[
    IngestMissingIntervalMeta | None,
    dict[
        tuple[str, str, str], list[IngestMissingIntervalRow]
    ],
]

Fetch latest snapshot metadata and grouped interval rows.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def fetch_latest_missing_snapshot(
    self,
) -> tuple[IngestMissingIntervalMeta | None, dict[tuple[str, str, str], list[IngestMissingIntervalRow]]]:
    """Fetch latest snapshot metadata and grouped interval rows."""

    with Session(self._engine) as session:
        metadata = session.exec(
            select(IngestMissingIntervalMeta).order_by(desc(IngestMissingIntervalMeta.meta_id)).limit(1)
        ).first()
        if metadata is None or metadata.meta_id is None:
            return None, {}
        rows = session.exec(
            select(IngestMissingInterval, IngestSourceRef)
            .join(IngestSourceRef, IngestMissingInterval.source_ref_id == IngestSourceRef.source_ref_id)
            .where(IngestMissingInterval.meta_id == metadata.meta_id)
            .order_by(
                IngestSourceRef.owner_alias_value,
                IngestMissingInterval.urn_nir,
                IngestMissingInterval.node_type,
                IngestMissingInterval.node_id,
            )
        ).all()

    grouped: dict[tuple[str, str, str], list[IngestMissingIntervalRow]] = {}
    for row, source_ref in rows:
        key = (
            source_ref.source,
            source_ref.owner_alias_kind,
            source_ref.owner_alias_value,
        )
        grouped.setdefault(key, []).append(
            IngestMissingIntervalRow(
                owner_alias_value=source_ref.owner_alias_value,
                urn_nir=row.urn_nir or "",
                node_type=row.node_type,
                node_id=row.node_id,
                start_date=row.start_date,
                end_date=row.end_date,
                reason=row.reason,
            )
        )
    return metadata, grouped

fetch_version_states

fetch_version_states(
    *, source: str, owner_alias_kind: str
) -> dict[str, dict[str, dict[str, str]]]

Fetch nested owner_alias_value -> version_tag -> source_hashes map.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def fetch_version_states(
    self,
    *,
    source: str,
    owner_alias_kind: str,
) -> dict[str, dict[str, dict[str, str]]]:
    """Fetch nested ``owner_alias_value -> version_tag -> source_hashes`` map."""

    result: dict[str, dict[str, dict[str, str]]] = {}
    with self._read_session(session=None) as active_session:
        rows = active_session.exec(
            select(IngestVersionState, IngestSourceRef)
            .join(
                IngestSourceRef,
                IngestVersionState.source_ref_id == IngestSourceRef.source_ref_id,
            )
            .where(IngestSourceRef.source == source)
            .where(IngestSourceRef.owner_alias_kind == owner_alias_kind)
        ).all()
    for row, source_ref in rows:
        hashes: dict[str, str] = {}
        if row.hash_json:
            hashes["json"] = row.hash_json
        if row.hash_akn:
            hashes["akn"] = row.hash_akn
        if row.hash_nir:
            hashes["nir"] = row.hash_nir
        result.setdefault(source_ref.owner_alias_value, {})[row.version_tag] = hashes
    return result

find_source_ref_id

find_source_ref_id(
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    session: Session | None = None,
) -> int | None

Resolve a source publication reference id without creating rows.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def find_source_ref_id(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    session: Session | None = None,
) -> int | None:
    """Resolve a source publication reference id without creating rows."""

    cleaned_source = self._require_non_empty(source, field_name="source")
    cleaned_owner_alias_kind = self._require_non_empty(owner_alias_kind, field_name="owner_alias_kind")
    cleaned_owner_alias_value = self._require_non_empty(owner_alias_value, field_name="owner_alias_value")
    with self._read_session(session=session) as active_session:
        row = active_session.exec(
            select(IngestSourceRef)
            .where(IngestSourceRef.source == cleaned_source)
            .where(IngestSourceRef.owner_alias_kind == cleaned_owner_alias_kind)
            .where(IngestSourceRef.owner_alias_value == cleaned_owner_alias_value)
        ).first()
        if row is None:
            return None
        return self._require_persisted_id(row.source_ref_id, field_name="source_ref_id")

finish_run

finish_run(
    *,
    run_id: str,
    status: str,
    skip_stats: dict[str, int] | None = None,
) -> None

Update run status and finished timestamp.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def finish_run(self, *, run_id: str, status: str, skip_stats: dict[str, int] | None = None) -> None:
    """Update run status and finished timestamp."""

    if not is_valid_ingest_status(status):
        raise ValueError(f"Unsupported ingest run status: {status}")
    with Session(self._engine) as session:
        row = session.get(IngestRun, run_id)
        if row is None:
            return
        if skip_stats:
            params: dict[str, Any] = {}
            try:
                loaded = json.loads(row.params_json)
                if isinstance(loaded, dict):
                    params = loaded
            except json.JSONDecodeError:
                params = {}
            params["skip_stats"] = {key: int(value) for key, value in sorted(skip_stats.items())}
            row.params_json = json.dumps(params, ensure_ascii=False)
        row.finished_at = self.now_iso()
        row.status = status
        session.add(row)
        session.commit()

insert_missing_interval_rows_bulk_raw

insert_missing_interval_rows_bulk_raw(
    *,
    rows: list[dict[str, Any]],
    session: Session | None = None,
) -> None

Insert normalized ig_missing_interval rows.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def insert_missing_interval_rows_bulk_raw(
    self,
    *,
    rows: list[dict[str, Any]],
    session: Session | None = None,
) -> None:
    """Insert normalized ``ig_missing_interval`` rows."""

    if not rows:
        return
    with self._write_session(session=session) as active_session:
        active_session.execute(IngestMissingInterval.__table__.insert(), rows)

insert_missing_snapshot_meta_raw

insert_missing_snapshot_meta_raw(
    *,
    run_id: str,
    generated_at: str,
    acts_total: int,
    acts_with_missing: int,
    missing_intervals_total: int,
    acts_expected: int | None,
    complete: bool,
    download_manifest_path: str,
    download_manifest_sha256: str | None,
    ingest_manifest_path: str | None,
    ingest_manifest_sha256: str | None,
    schema_version: str,
    session: Session | None = None,
) -> int

Insert one ig_missing_interval_meta row and return meta_id.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def insert_missing_snapshot_meta_raw(
    self,
    *,
    run_id: str,
    generated_at: str,
    acts_total: int,
    acts_with_missing: int,
    missing_intervals_total: int,
    acts_expected: int | None,
    complete: bool,
    download_manifest_path: str,
    download_manifest_sha256: str | None,
    ingest_manifest_path: str | None,
    ingest_manifest_sha256: str | None,
    schema_version: str,
    session: Session | None = None,
) -> int:
    """Insert one ``ig_missing_interval_meta`` row and return ``meta_id``."""

    with self._write_session(session=session) as active_session:
        row = IngestMissingIntervalMeta(
            run_id=run_id,
            generated_at=generated_at,
            acts_total=acts_total,
            acts_with_missing=acts_with_missing,
            missing_intervals_total=missing_intervals_total,
            acts_expected=acts_expected,
            complete=1 if complete else 0,
            download_manifest_path=download_manifest_path,
            download_manifest_sha256=download_manifest_sha256,
            ingest_manifest_path=ingest_manifest_path,
            ingest_manifest_sha256=ingest_manifest_sha256,
            schema_version=schema_version,
        )
        active_session.add(row)
        active_session.flush()
        if row.meta_id is None:
            raise RuntimeError("Missing meta_id after snapshot insert")
        return int(row.meta_id)

load_builder_state_v2

load_builder_state_v2(
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    run_id: str,
) -> dict[str, Any] | None

Load compact incremental builder state spill row.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def load_builder_state_v2(
    self,
    *,
    source: str,
    owner_alias_kind: str,
    owner_alias_value: str,
    run_id: str,
) -> dict[str, Any] | None:
    """Load compact incremental builder state spill row."""

    source_ref_id = self.find_source_ref_id(
        source=source,
        owner_alias_kind=owner_alias_kind,
        owner_alias_value=owner_alias_value,
    )
    if source_ref_id is None:
        return None
    with Session(self._engine) as session:
        row = session.exec(
            select(IngestBuilderStateV2)
            .where(IngestBuilderStateV2.run_id == run_id)
            .where(IngestBuilderStateV2.source_ref_id == source_ref_id)
        ).first()
    if row is None:
        return None
    try:
        parsed = json.loads(row.state_json)
    except json.JSONDecodeError:
        return None
    return parsed if isinstance(parsed, dict) else None

map_source_to_act

map_source_to_act(
    *,
    source_ref_id: int,
    act_id: int,
    run_id: str | None,
    confidence: float | None = None,
    evidence_json: str | None = None,
    session: Session | None = None,
) -> None

Upsert resolved source->act mapping row.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def map_source_to_act(
    self,
    *,
    source_ref_id: int,
    act_id: int,
    run_id: str | None,
    confidence: float | None = None,
    evidence_json: str | None = None,
    session: Session | None = None,
) -> None:
    """Upsert resolved source->act mapping row."""

    with self._write_session(session=session) as active_session:
        row = active_session.get(IngestSourceRef, source_ref_id)
        if row is None:
            raise ValueError(f"Unknown source_ref_id={source_ref_id}")
        row.act_id = act_id
        row.resolved_run_id = run_id
        row.confidence = confidence
        row.evidence_json = evidence_json
        active_session.add(row)

now_iso staticmethod

now_iso() -> str

Return current UTC timestamp in ISO-8601.

Source code in src/law_graph/storage/repositories/ingest_repository.py
@staticmethod
def now_iso() -> str:
    """Return current UTC timestamp in ISO-8601."""

    return datetime.now(tz=UTC).isoformat()

replace_act_build_rows_raw

replace_act_build_rows_raw(
    *,
    build_id: int,
    urn_nir: str,
    variant_diagnostic: IngestBuildVariantDiagnosticRow
    | None,
    node_intervals: list[IngestNodeIntervalRow],
    content_blobs: list[IngestContentBlobRow],
    metadata_deltas: list[IngestMetadataDeltaRow],
    session: Session | None = None,
) -> None

Replace build-dependent rowsets for one canonical build id.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def replace_act_build_rows_raw(
    self,
    *,
    build_id: int,
    urn_nir: str,
    variant_diagnostic: IngestBuildVariantDiagnosticRow | None,
    node_intervals: list[IngestNodeIntervalRow],
    content_blobs: list[IngestContentBlobRow],
    metadata_deltas: list[IngestMetadataDeltaRow],
    session: Session | None = None,
) -> None:
    """Replace build-dependent rowsets for one canonical build id."""

    with self._write_session(session=session) as active_session:
        active_session.exec(delete(IngestNodeInterval).where(IngestNodeInterval.build_id == build_id))
        active_session.exec(delete(IngestContentBlob).where(IngestContentBlob.build_id == build_id))
        active_session.exec(delete(IngestMetadataDelta).where(IngestMetadataDelta.build_id == build_id))
        active_session.exec(
            delete(IngestBuildVariantDiagnostic).where(IngestBuildVariantDiagnostic.build_id == build_id)
        )

        if variant_diagnostic is not None:
            active_session.add(
                IngestBuildVariantDiagnostic(
                    build_id=build_id,
                    has_conflict=1 if variant_diagnostic.has_conflict else 0,
                    variant_count=variant_diagnostic.variant_count,
                    provenance_json=json.dumps(variant_diagnostic.provenance, ensure_ascii=False),
                )
            )

        if node_intervals:
            active_session.execute(
                IngestNodeInterval.__table__.insert(),
                [
                    {
                        "build_id": build_id,
                        "urn_nir": urn_nir,
                        "node_type": row.node_type,
                        "node_id": row.node_id,
                        "interval_id": row.interval_id,
                        "start_date": row.start_date,
                        "end_date": row.end_date,
                        "content_version_id": row.content_version_id,
                        "missing_reason": row.missing_reason,
                        "observations_json": json.dumps(row.observations, ensure_ascii=False),
                        "legal_updates_json": json.dumps(row.legal_updates, ensure_ascii=False),
                    }
                    for row in node_intervals
                ],
            )

        if content_blobs:
            active_session.execute(
                IngestContentBlob.__table__.insert(),
                [
                    {
                        "build_id": build_id,
                        "urn_nir": urn_nir,
                        "content_version_id": row.content_version_id,
                        "node_type": row.node_type,
                        "payload_json": json.dumps(row.payload, ensure_ascii=False),
                    }
                    for row in content_blobs
                ],
            )

        if metadata_deltas:
            active_session.execute(
                IngestMetadataDelta.__table__.insert(),
                [
                    {
                        "build_id": build_id,
                        "urn_nir": row.urn_nir,
                        "effective_date": row.effective_date,
                        "set_json": (
                            json.dumps(row.set_values, ensure_ascii=False) if row.set_values is not None else None
                        ),
                        "unset_json": (
                            json.dumps(row.unset_values, ensure_ascii=False)
                            if row.unset_values is not None
                            else None
                        ),
                    }
                    for row in metadata_deltas
                ],
            )

start_run

start_run(
    *,
    params: dict[str, Any],
    download_manifest_path: Path,
    download_manifest_sha256: str | None,
) -> str

Insert ingest run row and return run id.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def start_run(
    self,
    *,
    params: dict[str, Any],
    download_manifest_path: Path,
    download_manifest_sha256: str | None,
) -> str:
    """Insert ingest run row and return run id."""

    run_id = uuid.uuid4().hex
    with Session(self._engine) as session:
        session.add(
            IngestRun(
                run_id=run_id,
                started_at=self.now_iso(),
                status="running",
                params_json=json.dumps(params, ensure_ascii=False),
                download_manifest_sha256=download_manifest_sha256,
                download_manifest_path=download_manifest_path.resolve(strict=False).as_posix(),
            )
        )
        session.commit()
    return run_id

upsert_act_build_raw

upsert_act_build_raw(
    *,
    act_id: int,
    version_tag: str,
    run_id: str,
    input_versions_json: str,
    interval_node_count: int,
    content_blob_count: int,
    original_payloads_json: str | None,
    session: Session | None = None,
) -> int

Upsert normalized ig_act_build row and return build_id.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def upsert_act_build_raw(
    self,
    *,
    act_id: int,
    version_tag: str,
    run_id: str,
    input_versions_json: str,
    interval_node_count: int,
    content_blob_count: int,
    original_payloads_json: str | None,
    session: Session | None = None,
) -> int:
    """Upsert normalized ``ig_act_build`` row and return ``build_id``."""

    stmt = sqlite_insert(IngestActBuild.__table__).values(
        {
            "act_id": act_id,
            "version_tag": version_tag,
            "run_id": run_id,
            "input_versions_json": input_versions_json,
            "interval_node_count": interval_node_count,
            "content_blob_count": content_blob_count,
            "original_payloads_json": original_payloads_json,
        }
    )
    upsert_stmt = stmt.on_conflict_do_update(
        index_elements=["act_id", "version_tag"],
        set_={
            "run_id": stmt.excluded.run_id,
            "input_versions_json": stmt.excluded.input_versions_json,
            "interval_node_count": stmt.excluded.interval_node_count,
            "content_blob_count": stmt.excluded.content_blob_count,
            "original_payloads_json": stmt.excluded.original_payloads_json,
        },
    )
    with self._write_session(session=session) as active_session:
        active_session.execute(upsert_stmt)
        build_id = active_session.exec(
            select(IngestActBuild.build_id)
            .where(IngestActBuild.act_id == act_id)
            .where(IngestActBuild.version_tag == version_tag)
        ).first()
        if build_id is None:
            raise RuntimeError("Missing build_id after act build upsert")
        return int(build_id)

upsert_act_progress_rows_bulk_raw

upsert_act_progress_rows_bulk_raw(
    *,
    rows: list[dict[str, Any]],
    session: Session | None = None,
) -> None

Upsert normalized ig_act_progress_v2 rows keyed by IDs.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def upsert_act_progress_rows_bulk_raw(
    self,
    *,
    rows: list[dict[str, Any]],
    session: Session | None = None,
) -> None:
    """Upsert normalized ``ig_act_progress_v2`` rows keyed by IDs."""

    if not rows:
        return
    stmt = sqlite_insert(IngestActProgressV2.__table__).values(rows)
    upsert_stmt = stmt.on_conflict_do_update(
        index_elements=["run_id", "source_ref_id"],
        set_={
            "phase": stmt.excluded.phase,
            "last_variant_id": stmt.excluded.last_variant_id,
            "variants_total": stmt.excluded.variants_total,
            "variants_done": stmt.excluded.variants_done,
            "status": stmt.excluded.status,
            "updated_at": stmt.excluded.updated_at,
        },
    )
    with self._write_session(session=session) as active_session:
        active_session.execute(upsert_stmt)

upsert_builder_state_raw

upsert_builder_state_raw(
    *,
    run_id: str,
    source_ref_id: int,
    state_json: str,
    updated_at: str,
    session: Session | None = None,
) -> None

Upsert one normalized ig_builder_state_v2 row.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def upsert_builder_state_raw(
    self,
    *,
    run_id: str,
    source_ref_id: int,
    state_json: str,
    updated_at: str,
    session: Session | None = None,
) -> None:
    """Upsert one normalized ``ig_builder_state_v2`` row."""

    stmt = sqlite_insert(IngestBuilderStateV2.__table__).values(
        {
            "run_id": run_id,
            "source_ref_id": source_ref_id,
            "state_json": state_json,
            "updated_at": updated_at,
        }
    )
    upsert_stmt = stmt.on_conflict_do_update(
        index_elements=["run_id", "source_ref_id"],
        set_={
            "state_json": stmt.excluded.state_json,
            "updated_at": stmt.excluded.updated_at,
        },
    )
    with self._write_session(session=session) as active_session:
        active_session.execute(upsert_stmt)

upsert_variant_event_rows_bulk_raw

upsert_variant_event_rows_bulk_raw(
    *,
    rows: list[dict[str, Any]],
    session: Session | None = None,
) -> None

Upsert normalized ig_variant_event_v2 rows keyed by IDs.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def upsert_variant_event_rows_bulk_raw(
    self,
    *,
    rows: list[dict[str, Any]],
    session: Session | None = None,
) -> None:
    """Upsert normalized ``ig_variant_event_v2`` rows keyed by IDs."""

    if not rows:
        return
    stmt = sqlite_insert(IngestVariantEventV2.__table__).values(rows)
    upsert_stmt = stmt.on_conflict_do_update(
        index_elements=["run_id", "source_ref_id", "variant_id"],
        set_={
            "version_tag": stmt.excluded.version_tag,
            "parse_status": stmt.excluded.parse_status,
            "variant_fingerprint": stmt.excluded.variant_fingerprint,
            "skip_reason": stmt.excluded.skip_reason,
            "metrics_json": stmt.excluded.metrics_json,
            "created_at": stmt.excluded.created_at,
        },
    )
    with self._write_session(session=session) as active_session:
        active_session.execute(upsert_stmt)

upsert_version_states_bulk_raw

upsert_version_states_bulk_raw(
    *,
    rows: list[dict[str, Any]],
    session: Session | None = None,
) -> None

Upsert normalized ig_version_state rows keyed by resolved IDs.

Source code in src/law_graph/storage/repositories/ingest_repository.py
def upsert_version_states_bulk_raw(
    self,
    *,
    rows: list[dict[str, Any]],
    session: Session | None = None,
) -> None:
    """Upsert normalized ``ig_version_state`` rows keyed by resolved IDs."""

    if not rows:
        return
    stmt = sqlite_insert(IngestVersionState.__table__).values(rows)
    upsert_stmt = stmt.on_conflict_do_update(
        index_elements=["source_ref_id", "version_tag"],
        set_={
            "hash_json": stmt.excluded.hash_json,
            "hash_akn": stmt.excluded.hash_akn,
            "hash_nir": stmt.excluded.hash_nir,
            "built_at": stmt.excluded.built_at,
            "last_run_id": stmt.excluded.last_run_id,
        },
    )
    with self._write_session(session=session) as active_session:
        active_session.execute(upsert_stmt)

Download Repository

download_repository

SQL primitives and schema lifecycle for download manifest persistence

Classes:

Name Description
DownloadRepository

Repository owning schema setup and table-oriented download primitives.

DownloadRepository

DownloadRepository(db_path: Path)

Repository owning schema setup and table-oriented download primitives.

Methods:

Name Description
close

Dispose pooled DB connections eagerly.

count_artifact_items

Count distinct artifact roots currently tracked in the manifest.

count_missing_artifact_paths

Count manifest file paths that are missing on disk under root_dir.

create_constraints

Create schema indexes and compatibility tables required by the store.

create_views

Create and refresh SQL views used by coverage/reporting queries.

fetch_artifact_rows

Yield joined artifact rows used to assemble artifact inventories.

fetch_coverage_rows

Fetch denormalized act coverage rows for reporting and export views.

get_any_path_for_sha256

Return one manifest path for a given blob hash, if present.

scan_missing_artifact_paths

Scan manifest file paths for on-disk presence.

validate_schema_contract

Validate required download-manifest columns and nullability constraints.

Attributes:

Name Type Description
engine Engine

Return the SQLAlchemy engine backing this repository.

Source code in src/law_graph/storage/repositories/download_repository.py
def __init__(self, db_path: Path) -> None:
    self._engine: Engine = create_engine(
        f"sqlite:///{db_path}",
        connect_args={"check_same_thread": False},
    )
    with self._engine.connect() as conn:
        conn.exec_driver_sql("PRAGMA journal_mode=WAL")
        conn.exec_driver_sql("PRAGMA synchronous=FULL")
        conn.exec_driver_sql("PRAGMA cache_size=-64000")
        conn.exec_driver_sql("PRAGMA mmap_size=268435456")
        conn.commit()
    SQLModel.metadata.create_all(
        self._engine,
        tables=[
            ActCatalog.__table__,
            ActAlias.__table__,
            SourceRef.__table__,
            Checkpoint.__table__,
            ArtifactBlob.__table__,
            ActFile.__table__,
            DownloadRun.__table__,
            DownloadEvent.__table__,
        ],
    )
    self.create_constraints()
    self.validate_schema_contract()
    self.create_views()

engine property

engine: Engine

Return the SQLAlchemy engine backing this repository.

close

close() -> None

Dispose pooled DB connections eagerly.

Source code in src/law_graph/storage/repositories/download_repository.py
def close(self) -> None:
    """Dispose pooled DB connections eagerly."""

    self._engine.dispose()

count_artifact_items

count_artifact_items() -> int

Count distinct artifact roots currently tracked in the manifest.

Source code in src/law_graph/storage/repositories/download_repository.py
def count_artifact_items(self) -> int:
    """Count distinct artifact roots currently tracked in the manifest."""

    with self._engine.connect() as conn:
        row = conn.exec_driver_sql(
            """
            SELECT COUNT(
                DISTINCT CASE
                    WHEN instr(path, '/') > 0
                        THEN substr(path, 1, instr(path, '/') - 1)
                    ELSE path
                END
            )
            FROM dm_act_file
            """
        ).first()
    if row is None or row[0] is None:
        return 0
    return int(row[0])

count_missing_artifact_paths

count_missing_artifact_paths(*, root_dir: Path) -> int

Count manifest file paths that are missing on disk under root_dir.

Source code in src/law_graph/storage/repositories/download_repository.py
def count_missing_artifact_paths(self, *, root_dir: Path) -> int:
    """Count manifest file paths that are missing on disk under ``root_dir``."""

    missing, _ = self.scan_missing_artifact_paths(root_dir=root_dir)
    return missing

create_constraints

create_constraints() -> None

Create schema indexes and compatibility tables required by the store.

Source code in src/law_graph/storage/repositories/download_repository.py
def create_constraints(self) -> None:
    """Create schema indexes and compatibility tables required by the store."""

    with self._engine.connect() as conn:
        conn.exec_driver_sql("CREATE INDEX IF NOT EXISTS ix_dm_act_file_path ON dm_act_file(path)")
        conn.exec_driver_sql(
            "CREATE UNIQUE INDEX IF NOT EXISTS ux_dm_act_catalog_canonical_urn_nir ON dm_act_catalog(canonical_urn_nir) WHERE canonical_urn_nir IS NOT NULL"
        )
        conn.exec_driver_sql(
            "CREATE UNIQUE INDEX IF NOT EXISTS ux_dm_act_catalog_primary_eli_id ON dm_act_catalog(primary_eli_id) WHERE primary_eli_id IS NOT NULL"
        )
        conn.exec_driver_sql(
            "CREATE UNIQUE INDEX IF NOT EXISTS ux_dm_act_alias_kind_value ON dm_act_alias(kind, value)"
        )
        conn.exec_driver_sql(
            "CREATE UNIQUE INDEX IF NOT EXISTS ux_dm_source_ref_source_owner_alias ON dm_source_ref(source, owner_alias_kind, owner_alias_value)"
        )
        conn.exec_driver_sql(
            """
            CREATE TABLE IF NOT EXISTS dm_download_ambiguity (
              ambiguity_id INTEGER PRIMARY KEY,
              act_id INTEGER REFERENCES dm_act_catalog(act_id),
              run_id TEXT REFERENCES dm_download_run(run_id),
              reason TEXT NOT NULL,
              observed_keys_json TEXT NOT NULL DEFAULT '[]',
              candidate_act_ids_json TEXT NOT NULL DEFAULT '[]',
              created_at TEXT NOT NULL,
              updated_at TEXT NOT NULL,
              resolved INTEGER NOT NULL DEFAULT 0,
              UNIQUE(act_id, reason)
            )
            """
        )
        ambiguity_columns = {
            str(row[1]) for row in conn.exec_driver_sql("PRAGMA table_info(dm_download_ambiguity)").fetchall()
        }
        if "act_id" not in ambiguity_columns and "act_pk" in ambiguity_columns:
            conn.exec_driver_sql("ALTER TABLE dm_download_ambiguity RENAME TO dm_download_ambiguity_old")
            conn.exec_driver_sql(
                """
                CREATE TABLE dm_download_ambiguity (
                  ambiguity_id INTEGER PRIMARY KEY,
                  act_id INTEGER REFERENCES dm_act_catalog(act_id),
                  run_id TEXT REFERENCES dm_download_run(run_id),
                  reason TEXT NOT NULL,
                  observed_keys_json TEXT NOT NULL DEFAULT '[]',
                  candidate_act_ids_json TEXT NOT NULL DEFAULT '[]',
                  created_at TEXT NOT NULL,
                  updated_at TEXT NOT NULL,
                  resolved INTEGER NOT NULL DEFAULT 0,
                  UNIQUE(act_id, reason)
                )
                """
            )
            conn.exec_driver_sql(
                """
                INSERT INTO dm_download_ambiguity(
                  ambiguity_id,
                  act_id,
                  run_id,
                  reason,
                  observed_keys_json,
                  candidate_act_ids_json,
                  created_at,
                  updated_at,
                  resolved
                )
                SELECT
                  ambiguity_id,
                  act_pk,
                  run_id,
                  reason,
                  observed_keys_json,
                  candidate_act_pks_json,
                  created_at,
                  updated_at,
                  resolved
                FROM dm_download_ambiguity_old
                """
            )
            conn.exec_driver_sql("DROP TABLE dm_download_ambiguity_old")
        conn.exec_driver_sql(
            "CREATE INDEX IF NOT EXISTS ix_dm_download_ambiguity_act_id ON dm_download_ambiguity(act_id)"
        )
        conn.exec_driver_sql(
            "CREATE INDEX IF NOT EXISTS ix_dm_download_ambiguity_run_id ON dm_download_ambiguity(run_id)"
        )
        conn.exec_driver_sql(
            "CREATE INDEX IF NOT EXISTS ix_dm_act_file_source_ref_id ON dm_act_file(source_ref_id)"
        )
        conn.exec_driver_sql(
            "CREATE INDEX IF NOT EXISTS ix_dm_download_event_source_ref_id ON dm_download_event(source_ref_id)"
        )
        conn.commit()

create_views

create_views() -> None

Create and refresh SQL views used by coverage/reporting queries.

Source code in src/law_graph/storage/repositories/download_repository.py
def create_views(self) -> None:
    """Create and refresh SQL views used by coverage/reporting queries."""

    drop_legacy_sql = "DROP VIEW IF EXISTS act_coverage"
    drop_sql = "DROP VIEW IF EXISTS act_file_coverage"
    create_sql = """
    CREATE VIEW act_file_coverage AS
    SELECT
                a.act_id AS act_id,
      a.data_pubblicazione AS data_pubblicazione,
      a.data_emanazione AS data_emanazione,
      a.denominazione_atto AS denominazione_atto,
      a.numero_provvedimento AS numero_provvedimento,
      a.titolo AS titolo,
      COUNT(DISTINCT CASE WHEN f.mode = 'O' THEN f.file_id ELSE NULL END) AS files_originario,
      COUNT(DISTINCT CASE WHEN f.mode = 'V' THEN f.file_id ELSE NULL END) AS files_vigente,
      COUNT(DISTINCT CASE WHEN f.mode = 'M' THEN f.file_id ELSE NULL END) AS files_multivigente,
      MAX(CASE WHEN f.mode = 'V' THEN f.last_seen ELSE NULL END) AS last_seen_vigente
    FROM dm_act_catalog a
    LEFT JOIN dm_act_file f ON f.act_id = a.act_id
    GROUP BY a.act_id;
    """
    with self._engine.connect() as conn:
        conn.exec_driver_sql(drop_legacy_sql)
        conn.exec_driver_sql(drop_sql)
        conn.exec_driver_sql(create_sql)
        conn.commit()

fetch_artifact_rows

fetch_artifact_rows(
    *, order_by_path: bool
) -> Iterator[
    tuple[ActFile, ArtifactBlob, SourceRef | None]
]

Yield joined artifact rows used to assemble artifact inventories.

Source code in src/law_graph/storage/repositories/download_repository.py
def fetch_artifact_rows(
    self,
    *,
    order_by_path: bool,
) -> Iterator[tuple[ActFile, ArtifactBlob, SourceRef | None]]:
    """Yield joined artifact rows used to assemble artifact inventories."""

    with Session(self._engine) as session:
        statement = (
            select(ActFile, ArtifactBlob, SourceRef)
            .join(ArtifactBlob, ActFile.blob_id == ArtifactBlob.blob_id)
            .outerjoin(SourceRef, ActFile.source_ref_id == SourceRef.source_ref_id)
        )
        if order_by_path:
            statement = statement.order_by(ActFile.path.asc())
        for row in session.exec(statement):
            yield row[0], row[1], row[2]

fetch_coverage_rows

fetch_coverage_rows(
    *,
    owner_alias_kind: str | None,
    urn_alias_kind: str,
    shard_mod: int | None,
    shard_rem: int | None,
) -> list[dict[str, Any]]

Fetch denormalized act coverage rows for reporting and export views.

Source code in src/law_graph/storage/repositories/download_repository.py
def fetch_coverage_rows(
    self,
    *,
    owner_alias_kind: str | None,
    urn_alias_kind: str,
    shard_mod: int | None,
    shard_rem: int | None,
) -> list[dict[str, Any]]:
    """Fetch denormalized act coverage rows for reporting and export views."""

    with self._engine.connect() as conn:
        rows = (
            conn.exec_driver_sql(
                """
            SELECT
              c.*,
              ur.urn AS urn,
              own.owner_alias AS owner_alias
            FROM act_file_coverage c
            LEFT JOIN (
                SELECT act_id, MAX(value) AS urn
                FROM dm_act_alias
                WHERE kind = :urn_kind
                GROUP BY act_id
            ) ur ON ur.act_id = c.act_id
            LEFT JOIN (
                SELECT act_id, MAX(value) AS owner_alias
                FROM dm_act_alias
                WHERE :owner_alias_kind IS NOT NULL
                  AND kind = :owner_alias_kind
                GROUP BY act_id
            ) own ON own.act_id = c.act_id
            WHERE (:shard_mod IS NULL OR (c.act_id % :shard_mod) = :shard_rem)
            ORDER BY c.act_id
            """,
                {
                    "owner_alias_kind": owner_alias_kind,
                    "urn_kind": urn_alias_kind,
                    "shard_mod": shard_mod,
                    "shard_rem": shard_rem,
                },
            )
            .mappings()
            .all()
        )
    return [dict(row) for row in rows]

get_any_path_for_sha256

get_any_path_for_sha256(*, sha256: str) -> str | None

Return one manifest path for a given blob hash, if present.

Source code in src/law_graph/storage/repositories/download_repository.py
def get_any_path_for_sha256(self, *, sha256: str) -> str | None:
    """Return one manifest path for a given blob hash, if present."""

    cleaned = sha256.strip().lower()
    if not cleaned:
        return None
    with Session(self._engine) as session:
        row = session.exec(
            select(ActFile.path)
            .join(ArtifactBlob, ActFile.blob_id == ArtifactBlob.blob_id)
            .where(ArtifactBlob.sha256 == cleaned)
            .order_by(ActFile.first_seen.asc())
            .limit(1)
        ).first()
    return str(row) if row else None

scan_missing_artifact_paths

scan_missing_artifact_paths(
    *, root_dir: Path, max_checks: int | None = None
) -> tuple[int, int]

Scan manifest file paths for on-disk presence.

Returns:

Type Description
tuple[int, int]

A tuple of (missing_count, checked_count).

Source code in src/law_graph/storage/repositories/download_repository.py
def scan_missing_artifact_paths(
    self,
    *,
    root_dir: Path,
    max_checks: int | None = None,
) -> tuple[int, int]:
    """Scan manifest file paths for on-disk presence.

    Returns
    -------
    tuple[int, int]
        A tuple of ``(missing_count, checked_count)``.
    """

    if max_checks is not None and max_checks <= 0:
        return 0, 0

    resolved_root = root_dir.resolve()
    with Session(self._engine) as session:
        statement = select(ActFile.path).order_by(ActFile.file_id.asc())
        if max_checks is not None:
            statement = statement.limit(max_checks)
        rows = session.exec(statement)
        missing = 0
        checked = 0
        for rel_path in rows:
            checked += 1
            candidate = resolved_root / str(rel_path)
            if not candidate.exists():
                missing += 1
    return missing, checked

validate_schema_contract

validate_schema_contract() -> None

Validate required download-manifest columns and nullability constraints.

Source code in src/law_graph/storage/repositories/download_repository.py
def validate_schema_contract(self) -> None:
    """Validate required download-manifest columns and nullability constraints."""

    with self._engine.connect() as conn:
        catalog_columns = conn.exec_driver_sql("PRAGMA table_info(dm_act_catalog)").fetchall()
        catalog_column_names = {str(row[1]) for row in catalog_columns}
        required_catalog = {"canonical_urn_nir", "primary_eli_id"}
        missing_catalog = sorted(required_catalog - catalog_column_names)
        if missing_catalog:
            raise RuntimeError(
                "download manifest schema mismatch: dm_act_catalog missing columns "
                f"{missing_catalog}. Run backend/scripts/migrations/migrate_download_manifest_schema.py."
            )

        alias_columns = conn.exec_driver_sql("PRAGMA table_info(dm_act_alias)").fetchall()
        alias_column_names = {str(row[1]) for row in alias_columns}
        required_alias = {"source", "evidence_json"}
        missing_alias = sorted(required_alias - alias_column_names)
        if missing_alias:
            raise RuntimeError(
                "download manifest schema mismatch: dm_act_alias missing columns "
                f"{missing_alias}. Run backend/scripts/migrations/migrate_download_manifest_schema.py."
            )

        file_columns = conn.exec_driver_sql("PRAGMA table_info(dm_act_file)").fetchall()
        file_column_names = {str(row[1]) for row in file_columns}
        required_file = {"metadata_json", "source_ref_id"}
        missing_file = sorted(required_file - file_column_names)
        if missing_file:
            raise RuntimeError(
                "download manifest schema mismatch: dm_act_file missing columns "
                f"{missing_file}. Run backend/scripts/migrations/migrate_download_manifest_schema.py."
            )

        event_columns = conn.exec_driver_sql("PRAGMA table_info(dm_download_event)").fetchall()
        event_column_names = {str(row[1]) for row in event_columns}
        required_event = {"urn_nir", "source_ref_id"}
        missing_event = sorted(required_event - event_column_names)
        if missing_event:
            raise RuntimeError(
                "download manifest schema mismatch: dm_download_event missing columns "
                f"{missing_event}. Run backend/scripts/migrations/migrate_download_manifest_schema.py."
            )

        source_ref_columns = conn.exec_driver_sql("PRAGMA table_info(dm_source_ref)").fetchall()
        source_ref_names = {str(row[1]) for row in source_ref_columns}
        required_source_ref = {
            "source_ref_id",
            "source",
            "owner_alias_kind",
            "owner_alias_value",
        }
        missing_source_ref = sorted(required_source_ref - source_ref_names)
        if missing_source_ref:
            raise RuntimeError(
                "download manifest schema mismatch: dm_source_ref missing columns "
                f"{missing_source_ref}. Run backend/scripts/migrations/migrate_source_ref_alias_identity.py."
            )

        ambiguity_columns = conn.exec_driver_sql("PRAGMA table_info(dm_download_ambiguity)").fetchall()
        act_id_not_null: bool | None = None
        for row in ambiguity_columns:
            if str(row[1]) == "act_id":
                act_id_not_null = int(row[3]) == 1
                break
        if act_id_not_null is None:
            raise RuntimeError(
                "download manifest schema mismatch: dm_download_ambiguity.act_id missing. "
                "Run backend/scripts/migrations/migrate_download_manifest_schema.py."
            )
        if act_id_not_null:
            raise RuntimeError(
                "download manifest schema mismatch: dm_download_ambiguity.act_id is NOT NULL. "
                "Run backend/scripts/migrations/migrate_download_manifest_schema.py."
            )