Skip to content

Delta API

Builder

builder

Canonical incremental builder contracts and engine.

Classes:

Name Description
ActSnapshot

One ingested snapshot for an act version tag.

VariantDescriptor

Deterministic identity for one source variant.

ActBuilder

Apply/finalize engine for parsed variant events.

Functions:

Name Description
build_canonical_records

Build canonical record rows and metadata deltas from snapshots.

build_variant_event

Project one parsed Act payload into canonical incremental event contracts.

ActSnapshot dataclass

ActSnapshot(
    version_tag: str,
    expression_date: date | None,
    act: Act,
    metadata: dict[str, JsonValue] | None = None,
)

One ingested snapshot for an act version tag.

VariantDescriptor dataclass

VariantDescriptor(
    source: str,
    manifest_id: str,
    eli_id: str | None,
    version_tag: str,
    variant_id: str,
    variant_sha: str,
    source_hashes: dict[str, str],
    paths: VariantPaths,
)

Deterministic identity for one source variant.

ActBuilder

ActBuilder()

Apply/finalize engine for parsed variant events.

Methods:

Name Description
apply

Apply one parsed variant event to builder state.

finalize

Finalize state into canonical row payloads.

Source code in src/law_graph/delta/builder.py
def __init__(self) -> None:
    self.state = BuilderState()

apply

apply(event: VariantEvent) -> None

Apply one parsed variant event to builder state.

Source code in src/law_graph/delta/builder.py
def apply(self, event: VariantEvent) -> None:
    """Apply one parsed variant event to builder state."""

    if event.skip_reason is not None:
        self.state.skipped_variants += 1
        return
    if self.state.urn_nir is None and event.urn_nir is not None:
        self.state.urn_nir = event.urn_nir
    self.state.input_versions.add(event.version_tag)
    for update in event.update_evidence:
        self.state.update_evidence_state.entries.append(update)

    for node_event in event.node_events:
        key = NodeKey(node_type=node_event.node_type, node_id=node_event.node_id)
        node_state = self.state.node_timeline_index.get(key)
        if node_state is None:
            node_state = NodeTimelineState(node_type=node_event.node_type, node_id=node_event.node_id)
            self.state.node_timeline_index[key] = node_state

        interval_state = node_state.intervals.get(node_event.interval_id)
        if interval_state is None:
            interval_state = IntervalTimelineState(
                interval_id=node_event.interval_id,
                start_date=node_event.start_date,
                end_date=node_event.end_date,
                content_version_id=node_event.content_version_id,
                missing_reason=node_event.missing_reason,
            )
            node_state.intervals[node_event.interval_id] = interval_state
        else:
            interval_state.start_date = node_event.start_date
            interval_state.end_date = node_event.end_date
            if node_event.content_version_id is not None:
                interval_state.content_version_id = node_event.content_version_id
                interval_state.missing_reason = node_event.missing_reason
            elif interval_state.content_version_id is None:
                interval_state.missing_reason = node_event.missing_reason

        if node_event.content_version_id is not None and node_event.content_payload is not None:
            self.state.content_blobs.setdefault(
                node_event.content_version_id,
                (node_event.node_type, node_event.content_payload),
            )

        for observation in node_event.observations:
            if observation not in interval_state.observations:
                interval_state.observations.append(observation)
        for legal_update in node_event.legal_updates:
            if legal_update not in interval_state.legal_updates:
                interval_state.legal_updates.append(legal_update)

    metadata_event = event.metadata_event
    if event.metadata_values is not None:
        patch = _metadata_patch_values(
            previous=self.state.metadata_current_values,
            current=event.metadata_values,
        )
        self.state.metadata_current_values = dict(event.metadata_values)
        if patch is not None:
            set_values, unset_values = patch
            metadata_urn_nir = event.urn_nir or self.state.urn_nir
            if metadata_urn_nir is None:
                raise ValueError("Metadata patch event requires urn_nir")
            metadata_event = MetadataEvent(
                urn_nir=metadata_urn_nir,
                effective_date=event.expression_date or date.min,
                set_values=set_values,
                unset_values=unset_values,
            )
        else:
            metadata_event = None

    if metadata_event is not None:
        self.state.metadata_timeline.by_effective_date[metadata_event.effective_date] = metadata_event

finalize

finalize() -> FinalizedActBundle

Finalize state into canonical row payloads.

Source code in src/law_graph/delta/builder.py
def finalize(self) -> FinalizedActBundle:
    """Finalize state into canonical row payloads."""

    urn_nir = self.state.urn_nir
    if urn_nir is None:
        raise ValueError("Builder state has no urn_nir")
    input_versions = sorted(self.state.input_versions)

    node_interval_rows: list[dict[str, Any]] = []
    missing_interval_rows: list[dict[str, Any]] = []
    for key, node_state in sorted(
        self.state.node_timeline_index.items(),
        key=lambda item: (item[0].node_type.value, item[0].node_id),
    ):
        for interval in sorted(
            node_state.intervals.values(),
            key=lambda value: (
                value.start_date or date.min,
                value.end_date or date.max,
                value.interval_id,
            ),
        ):
            row = {
                "node_type": key.node_type.value,
                "node_id": key.node_id,
                "interval_id": interval.interval_id,
                "start_date": interval.start_date.isoformat() if interval.start_date else None,
                "end_date": interval.end_date.isoformat() if interval.end_date else None,
                "content_version_id": interval.content_version_id,
                "missing_reason": interval.missing_reason,
                "observations": list(interval.observations),
                "legal_updates": list(interval.legal_updates),
            }
            node_interval_rows.append(row)
            if interval.content_version_id is None:
                missing_interval_rows.append(
                    {
                        "urn_nir": urn_nir,
                        "node_type": key.node_type.value,
                        "node_id": key.node_id,
                        "start_date": row["start_date"],
                        "end_date": row["end_date"],
                        "reason": interval.missing_reason,
                    }
                )

    content_blob_rows = [
        {
            "content_version_id": content_version_id,
            "node_type": node_type.value,
            "payload": payload,
        }
        for content_version_id, (node_type, payload) in sorted(self.state.content_blobs.items())
    ]

    metadata_delta_rows = [
        {
            "urn_nir": event.urn_nir,
            "effective_date": effective_date.isoformat(),
            "set_values": event.set_values,
            "unset_values": list(event.unset_values),
        }
        for effective_date, event in sorted(self.state.metadata_timeline.by_effective_date.items())
    ]

    return FinalizedActBundle(
        urn_nir=urn_nir,
        input_versions=input_versions,
        node_interval_rows=node_interval_rows,
        content_blob_rows=content_blob_rows,
        metadata_delta_rows=metadata_delta_rows,
        missing_interval_rows=missing_interval_rows,
    )

build_canonical_records

build_canonical_records(
    *,
    snapshots: list[ActSnapshot],
    update_evidence: Iterable[NormalizedUpdateEvidence],
) -> tuple[BaseActRecord, list[MetadataDeltaRecord]]

Build canonical record rows and metadata deltas from snapshots.

Source code in src/law_graph/delta/builder.py
def build_canonical_records(
    *,
    snapshots: list[ActSnapshot],
    update_evidence: Iterable[NormalizedUpdateEvidence],
) -> tuple[BaseActRecord, list[MetadataDeltaRecord]]:
    """Build canonical record rows and metadata deltas from snapshots."""

    if not snapshots:
        raise ValueError("snapshots must be non-empty")

    latest = _choose_latest_snapshot(snapshots)
    urn_nir = latest.act.urn_nir
    base = BaseActRecord(
        urn_nir=urn_nir,
        input_versions=sorted({snapshot.version_tag for snapshot in snapshots}, key=_version_tag_sort_key),
    )
    update_index = build_update_evidence_index(update_evidence)

    node_intervals: dict[tuple[NodeType, str], set[tuple[date | None, date | None]]] = defaultdict(set)
    observed_contents: dict[tuple[NodeType, str], list[tuple[ActSnapshot, BlobModel]]] = defaultdict(list)

    for snapshot in snapshots:
        act_expressions = list(snapshot.act.act_expressions)
        if not act_expressions:
            continue
        act_expr = act_expressions[0]

        for article in list(act_expr.articles):
            node_id = article.article_urn_nir
            key = (NodeType.ARTICLE, node_id)
            for version in list(article.article_versions):
                node_intervals[key].add(
                    (_as_date(version.article_version_start_date), _as_date(version.article_version_end_date))
                )
            observed_contents[key].append((snapshot, _article_payload(article)))
            observed_contents[(NodeType.ARTICLE_REFERENCES, node_id)].append(
                (snapshot, _article_references_payload(article))
            )
            observed_contents[(NodeType.ARTICLE_NOTES, node_id)].append((snapshot, _article_notes_payload(article)))

        structure_payload = ActStructureBlob(
            titles=[_title_structure_payload(title) for title in list(act_expr.titles)],
            chapters=[_chapter_structure_payload(chapter) for chapter in list(act_expr.chapters)],
        )
        observed_contents[(NodeType.ACT_STRUCTURE, urn_nir)].append((snapshot, structure_payload))

        observed_contents[(NodeType.PREFACES, urn_nir)].append(
            (
                snapshot,
                PrefaceGroupBlob(prefaces=[PrefaceBlob.from_snapshot(preface) for preface in list(act_expr.prefaces)]),
            )
        )
        observed_contents[(NodeType.REFERENCES, urn_nir)].append(
            (
                snapshot,
                ReferenceGroupBlob(
                    references=[ReferenceBlob.from_snapshot(reference) for reference in list(act_expr.references)]
                ),
            )
        )

        observed_contents[(NodeType.PREAMBLE, urn_nir)].append(
            (
                snapshot,
                PreambleBlob(
                    preamble_formula_text=act_expr.preamble_formula_text,
                    preamble_paragraph_text=act_expr.preamble_paragraph_text,
                ),
            )
        )
        observed_contents[(NodeType.CONCLUSIONS, urn_nir)].append(
            (
                snapshot,
                ConclusionsBlob(
                    conclusions_formula_text=act_expr.conclusions_formula_text,
                    conclusions_paragraph_text=act_expr.conclusions_paragraph_text,
                ),
            )
        )
        observed_contents[(NodeType.EXPRESSION_URIS, urn_nir)].append(
            (
                snapshot,
                ExpressionUrisBlob(
                    frbr_expression_akn_uri=act_expr.frbr_expression_akn_uri,
                    frbr_manifestation_akn_uri=act_expr.frbr_manifestation_akn_uri,
                    language_iso3=act_expr.language_iso3,
                    expression_date=_as_date(act_expr.expression_date),
                ),
            )
        )

        for attachment in list(snapshot.act.attachments):
            node_id = attachment.attachment_akn_uri
            key = (NodeType.ATTACHMENT, node_id)
            for version in list(attachment.attachment_versions):
                node_intervals[key].add(
                    (_as_date(version.attachment_version_start_date), _as_date(version.attachment_version_end_date))
                )
            observed_contents[key].append((snapshot, _attachment_payload(attachment)))

    snapshot_dates = sorted(
        {snapshot.expression_date for snapshot in snapshots if snapshot.expression_date is not None}
    )
    if snapshot_dates:
        group_intervals = [
            (
                snapshot_dates[index],
                snapshot_dates[index + 1] if index + 1 < len(snapshot_dates) else None,
            )
            for index in range(len(snapshot_dates))
        ]
        for group_type in (
            NodeType.ACT_STRUCTURE,
            NodeType.PREFACES,
            NodeType.REFERENCES,
            NodeType.PREAMBLE,
            NodeType.CONCLUSIONS,
            NodeType.EXPRESSION_URIS,
        ):
            node_intervals[(group_type, urn_nir)].update(group_intervals)
    else:
        for group_type in (
            NodeType.ACT_STRUCTURE,
            NodeType.PREFACES,
            NodeType.REFERENCES,
            NodeType.PREAMBLE,
            NodeType.CONCLUSIONS,
            NodeType.EXPRESSION_URIS,
        ):
            node_intervals[(group_type, urn_nir)].add((None, None))

    for (node_type, node_id), observations in observed_contents.items():
        if node_type not in {NodeType.ARTICLE_REFERENCES, NodeType.ARTICLE_NOTES}:
            continue
        observed_dates = sorted(
            {snapshot.expression_date for snapshot, _ in observations if snapshot.expression_date is not None}
        )
        if observed_dates:
            intervals = [
                (
                    observed_dates[index],
                    observed_dates[index + 1] if index + 1 < len(observed_dates) else None,
                )
                for index in range(len(observed_dates))
            ]
            node_intervals[(node_type, node_id)].update(intervals)
        else:
            node_intervals[(node_type, node_id)].add((None, None))

    blob_by_id: dict[str, ContentBlob] = {}
    interval_nodes: list[IntervalNode] = []

    def _upsert_blob(*, node_type: NodeType, content: BlobModel) -> str:
        version_id = _content_version_id(node_type=node_type, content=content)
        blob = blob_by_id.get(version_id)
        if blob is None:
            blob = ContentBlob(
                content_version_id=version_id,
                node_type=node_type,
                content=content.model_copy(deep=True),
            )
            blob_by_id[version_id] = blob
        return version_id

    for (node_type, node_id), intervals in sorted(
        node_intervals.items(), key=lambda item: (item[0][0].value, item[0][1])
    ):
        slots = sorted(intervals, key=lambda interval: (interval[0] or date.min, interval[1] or date.max))
        observations = list(observed_contents.get((node_type, node_id), []))

        node_intervals_out: list[NodeInterval] = []
        requires_expression_date = node_type in {
            NodeType.ARTICLE,
            NodeType.ATTACHMENT,
            NodeType.ARTICLE_REFERENCES,
            NodeType.ARTICLE_NOTES,
        }
        for start, end in slots:
            interval_id = _interval_id(node_id=node_id, start=start, end=end)
            candidates: list[tuple[date, str, BlobModel]] = []
            interval_observations: list[NodeVersionContentSource] = []
            for snapshot, content in observations:
                if snapshot.expression_date is None:
                    if requires_expression_date:
                        continue
                    observed_date = date.min
                else:
                    observed_date = snapshot.expression_date
                if not _covers(observed_date, start, end):
                    continue
                candidates.append((observed_date, snapshot.version_tag, content))
                interval_observations.append(
                    NodeVersionContentSource(version_tag=snapshot.version_tag, observed_at=snapshot.expression_date)
                )

            if candidates:
                _, _, chosen = max(
                    candidates, key=lambda candidate: (candidate[0], _version_tag_sort_key(candidate[1]))
                )
                content_vid = _upsert_blob(node_type=node_type, content=chosen)
                node_intervals_out.append(
                    NodeInterval(
                        interval_id=interval_id,
                        start_date=start,
                        end_date=end,
                        content_version_id=content_vid,
                        content_missing_reason=None,
                        observations=interval_observations,
                    )
                )
            else:
                node_intervals_out.append(
                    NodeInterval(
                        interval_id=interval_id,
                        start_date=start,
                        end_date=end,
                        content_version_id=None,
                        content_missing_reason="no_snapshot_payload_for_interval",
                        observations=interval_observations,
                    )
                )

        interval_nodes.append(IntervalNode(node_type=node_type, node_id=node_id, intervals=node_intervals_out))

        for _, content in observations:
            _upsert_blob(node_type=node_type, content=content)

    def _assign_interval_updates(node: IntervalNode) -> None:
        candidates = candidate_updates_for_node(
            update_index=update_index,
            urn_nir=urn_nir,
            node_type=node.node_type,
            node_id=node.node_id,
        )
        if not candidates:
            return
        ordered = sorted(
            node.intervals, key=lambda interval: (interval.start_date or date.min, interval.end_date or date.max)
        )
        previous_interval: NodeInterval | None = None
        for interval in ordered:
            window_start, window_end = interval_transition_window(previous_interval, interval)
            updates = select_interval_updates(
                node_type=node.node_type,
                window_start=window_start,
                window_end=window_end,
                candidates=candidates,
            )
            interval.legal_updates = [update.model_copy(deep=True) for update in updates]
            previous_interval = interval

    for node in interval_nodes:
        _assign_interval_updates(node)

    base.content_blobs = [blob_by_id[blob_id] for blob_id in sorted(blob_by_id)]
    base.interval_nodes = interval_nodes

    metadata_by_date: dict[date, MetadataDelta] = {}
    previous_meta: ActMetadata | None = None

    ordered_snapshots = sorted(
        snapshots,
        key=lambda snapshot: (
            snapshot.expression_date or date.min,
            _version_tag_sort_key(snapshot.version_tag),
        ),
    )

    for snapshot in ordered_snapshots:
        current_meta = _extract_metadata(snapshot.act)
        patch = _metadata_patch(previous_meta, current_meta)
        previous_meta = current_meta
        if patch is None:
            continue

        effective_date = snapshot.expression_date
        if effective_date is None:
            try:
                effective_date = _parse_version_tag(snapshot.version_tag)
            except ValueError:
                effective_date = date.min

        existing = metadata_by_date.get(effective_date)
        if existing is None:
            metadata_by_date[effective_date] = patch
            continue

        merged_set: dict[str, JsonValue] = {}
        if existing.set is not None:
            merged_set.update(cast(dict[str, JsonValue], existing.set.model_dump(mode="json", exclude_none=True)))
        if patch.set is not None:
            merged_set.update(cast(dict[str, JsonValue], patch.set.model_dump(mode="json", exclude_none=True)))

        merged_unset = sorted(set(existing.unset) | set(patch.unset))

        merged_patch = MetadataDelta(
            set=(ActMetadataPatch.model_validate(merged_set) if merged_set else None),
            unset=merged_unset,
        )
        metadata_by_date[effective_date] = merged_patch

    metadata_records = [
        MetadataDeltaRecord(
            urn_nir=urn_nir,
            effective_date=effective_date,
            set=patch.set,
            unset=patch.unset,
        )
        for effective_date, patch in sorted(metadata_by_date.items())
    ]

    return base, metadata_records

build_variant_event

build_variant_event(
    *,
    descriptor: VariantDescriptor,
    act: Act,
    metadata: dict[str, JsonValue] | None,
    source_presence: frozenset[str],
) -> VariantEvent

Project one parsed Act payload into canonical incremental event contracts.

Source code in src/law_graph/delta/builder.py
def build_variant_event(
    *,
    descriptor: VariantDescriptor,
    act: Act,
    metadata: dict[str, JsonValue] | None,
    source_presence: frozenset[str],
) -> VariantEvent:
    """Project one parsed Act payload into canonical incremental event contracts."""

    expression_date = act.act_expressions[0].expression_date if act.act_expressions else None
    snapshot = ActSnapshot(
        version_tag=descriptor.version_tag,
        expression_date=expression_date,
        act=act,
        metadata=metadata,
    )
    urn_nir, node_events, metadata_event = _project_node_events(snapshot=snapshot)
    metadata_values = _extract_metadata_values(act=act)
    fingerprint = _fingerprint_payload(
        descriptor=descriptor,
        expression_date=expression_date,
        urn_nir=urn_nir,
    )
    return VariantEvent(
        manifest_id=descriptor.manifest_id,
        version_tag=descriptor.version_tag,
        variant_id=descriptor.variant_id,
        variant_fingerprint=fingerprint,
        expression_date=expression_date,
        urn_nir=urn_nir,
        node_events=node_events,
        metadata_event=metadata_event,
        metadata_values=metadata_values,
        source_presence=source_presence,
        update_evidence=(),
        skip_reason=None,
    )

Resolver

resolver

Resolve an act view at a given date by replaying deltas.

Classes:

Name Description
MissingNodeContent

Placeholder for a node interval without resolved content.

ResolvedActView

Resolved view of an act at a given date.

Functions:

Name Description
resolve_state

Resolve the act view for date at.

MissingNodeContent dataclass

MissingNodeContent(interval_id: str, reason: str)

Placeholder for a node interval without resolved content.

ResolvedActView dataclass

ResolvedActView(
    urn_nir: str,
    effective_date: date,
    metadata: ActMetadata,
    nodes: dict[tuple[NodeType, str], ResolvedNodeValue],
    intervals: dict[tuple[NodeType, str], NodeInterval],
    act: Act,
)

Resolved view of an act at a given date.

resolve_state

resolve_state(
    *,
    base: BaseActRecord,
    metadata_deltas: list[MetadataDeltaRecord],
    at: date,
    scope: set[NodeType] | None = None,
) -> ResolvedActView

Resolve the act view for date at.

Source code in src/law_graph/delta/resolver.py
def resolve_state(
    *,
    base: BaseActRecord,
    metadata_deltas: list[MetadataDeltaRecord],
    at: date,
    scope: set[NodeType] | None = None,
) -> ResolvedActView:
    """Resolve the act view for date ``at``."""

    metadata_state: ActMetadata | None = None
    for record in sorted(metadata_deltas, key=lambda e: e.effective_date):
        if record.effective_date > at:
            break
        metadata_state = _apply_metadata_delta(metadata_state, record)

    if metadata_state is None:
        raise ValueError("No metadata deltas found before requested date")

    resolved_nodes_full, resolved_intervals = _resolve_nodes_at_date(base=base, at=at)
    scoped_nodes = (
        resolved_nodes_full if scope is None else {k: v for k, v in resolved_nodes_full.items() if k[0] in scope}
    )
    scoped_intervals = (
        resolved_intervals if scope is None else {k: v for k, v in resolved_intervals.items() if k[0] in scope}
    )

    metadata_view = metadata_state.model_copy(deep=True)
    metadata_view.mark_present(_optional_present_fields(metadata_state))

    act = _build_resolved_act(
        urn_nir=base.urn_nir,
        effective_date=at,
        metadata=metadata_state,
        resolved_nodes=resolved_nodes_full,
    )

    return ResolvedActView(
        urn_nir=base.urn_nir,
        effective_date=at,
        metadata=metadata_view,
        nodes=scoped_nodes,
        intervals=scoped_intervals,
        act=act,
    )