Storage API
Ingest Store
ingest_manifest_db
Law ingest hub orchestration boundary for ingest manifest persistence.
IngestManifestDbStore owns run policy, batching/deduping behavior, lock
scope, and unit-of-work coordination across repository primitives.
Classes:
| Name | Description |
|---|---|
IngestManifestDbStore |
Application-facing orchestration boundary for ingest persistence. |
IngestManifestDbStore
dataclass
Application-facing orchestration boundary for ingest persistence.
Methods:
| Name | Description |
|---|---|
close |
Dispose underlying DB resources eagerly. |
commit_act_side_effects |
Commit side-effect rowsets in one transaction boundary. |
commit_parse_side_effects |
Commit parse/checkpoint side effects from runner intent payloads. |
finalize_act_build |
Finalize one canonical act build in a single unit of work. |
finish_run |
Mark a previously started run as finished with final status. |
load_builder_state_v2 |
Load compact incremental builder state restart checkpoint. |
load_latest_missing_snapshot |
Load the most recent missing-interval snapshot metadata and rows. |
load_version_states |
Return nested map: owner_alias_value -> version_tag -> source hashes. |
persist_finalized_build |
Persist one finalized canonical build from runner intent payloads. |
record_progress_batch |
Record act-progress checkpoints with store-owned dedupe policy. |
record_variant_events_batch |
Record variant-event checkpoints with store-owned dedupe policy. |
record_version_states_batch |
Record version-state upserts with store-owned dedupe/ID resolution. |
save_builder_state_v2 |
Persist compact incremental builder state for restart checkpoints. |
save_missing_snapshot |
Persist a missing-interval snapshot and return its |
start_run |
Create and persist an ingest run row, returning its run id. |
validate_contract |
Validate canonical ingest schema invariants. |
close
commit_act_side_effects
commit_act_side_effects(
*,
source: str,
owner_alias_kind: str,
run_id: str,
progress_rows: list[tuple[str, IngestActProgressRowV2]]
| None = None,
variant_event_rows: list[
tuple[str, IngestVariantEventRowV2]
]
| None = None,
version_state_rows: list[
tuple[str, str, dict[str, str], str]
]
| None = None,
) -> None
Commit side-effect rowsets in one transaction boundary.
Source code in src/law_graph/storage/ingest_manifest_db.py
commit_parse_side_effects
commit_parse_side_effects(
*,
source: str,
owner_alias_kind: str,
run_id: str,
progress_checkpoints: list[
IngestProgressCheckpointInput
]
| None = None,
variant_event_checkpoints: list[
IngestVariantEventCheckpointInput
]
| None = None,
version_state_checkpoints: list[
IngestVersionStateCheckpointInput
]
| None = None,
) -> None
Commit parse/checkpoint side effects from runner intent payloads.
Source code in src/law_graph/storage/ingest_manifest_db.py
finalize_act_build
finalize_act_build(
*,
source: str,
owner_alias_kind: str,
owner_alias_value: str,
run_id: str,
urn_nir: str,
version_tag: str,
input_versions: list[str],
interval_node_count: int,
content_blob_count: int,
original_payloads: dict[str, Any] | None,
variant_diagnostic: IngestBuildVariantDiagnosticRow
| None = None,
node_intervals: list[IngestNodeIntervalRow],
content_blobs: list[IngestContentBlobRow],
metadata_deltas: list[IngestMetadataDeltaRow],
) -> int
Finalize one canonical act build in a single unit of work.
Source code in src/law_graph/storage/ingest_manifest_db.py
finish_run
Mark a previously started run as finished with final status.
Source code in src/law_graph/storage/ingest_manifest_db.py
load_builder_state_v2
load_builder_state_v2(
*,
source: str,
owner_alias_kind: str,
owner_alias_value: str,
run_id: str,
) -> dict[str, Any] | None
Load compact incremental builder state restart checkpoint.
Source code in src/law_graph/storage/ingest_manifest_db.py
load_latest_missing_snapshot
load_latest_missing_snapshot() -> tuple[
MissingSnapshotMetadata | None,
dict[
SourceOwnerAliasKey, list[IngestMissingIntervalRow]
],
]
Load the most recent missing-interval snapshot metadata and rows.
Source code in src/law_graph/storage/ingest_manifest_db.py
load_version_states
load_version_states(
*, source: str, owner_alias_kind: str
) -> dict[str, dict[str, dict[str, str]]]
Return nested map: owner_alias_value -> version_tag -> source hashes.
Source code in src/law_graph/storage/ingest_manifest_db.py
persist_finalized_build
persist_finalized_build(
*,
source: str,
owner_alias_kind: str,
run_id: str,
payload: IngestFinalizedBuildPayloadInput,
) -> int
Persist one finalized canonical build from runner intent payloads.
Source code in src/law_graph/storage/ingest_manifest_db.py
record_progress_batch
record_progress_batch(
*,
source: str,
owner_alias_kind: str,
run_id: str,
rows: list[tuple[str, IngestActProgressRowV2]],
session: Session | None = None,
) -> None
Record act-progress checkpoints with store-owned dedupe policy.
Source code in src/law_graph/storage/ingest_manifest_db.py
record_variant_events_batch
record_variant_events_batch(
*,
source: str,
owner_alias_kind: str,
run_id: str,
rows: list[tuple[str, IngestVariantEventRowV2]],
session: Session | None = None,
) -> None
Record variant-event checkpoints with store-owned dedupe policy.
Source code in src/law_graph/storage/ingest_manifest_db.py
record_version_states_batch
record_version_states_batch(
*,
source: str,
owner_alias_kind: str,
rows: list[tuple[str, str, dict[str, str], str]],
session: Session | None = None,
) -> None
Record version-state upserts with store-owned dedupe/ID resolution.
Source code in src/law_graph/storage/ingest_manifest_db.py
save_builder_state_v2
save_builder_state_v2(
*,
source: str,
owner_alias_kind: str,
owner_alias_value: str,
run_id: str,
state: dict[str, Any],
) -> None
Persist compact incremental builder state for restart checkpoints.
Source code in src/law_graph/storage/ingest_manifest_db.py
save_missing_snapshot
save_missing_snapshot(
*,
run_id: str,
source: str,
owner_alias_kind: str,
metadata: MissingSnapshotMetadata,
hints_by_source_owner_alias: dict[
SourceOwnerAliasKey,
list[IngestMissingIntervalRow | dict[str, Any]],
],
) -> int
Persist a missing-interval snapshot and return its meta_id.
Source code in src/law_graph/storage/ingest_manifest_db.py
start_run
start_run(
*,
params: dict[str, Any],
download_manifest_path: Path,
download_manifest_sha256: str | None,
) -> str
Create and persist an ingest run row, returning its run id.
Source code in src/law_graph/storage/ingest_manifest_db.py
validate_contract
Validate canonical ingest schema invariants.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If schema version table is missing or stale. |
Source code in src/law_graph/storage/ingest_manifest_db.py
Ingest Repository
ingest_repository
SQL primitive repository for ingest manifest DB operations.
This module owns table-oriented persistence primitives only. Application-level
orchestration policy (locking, batching/deduping strategy, retries, and unit of
work decisions across multiple primitives) belongs in
IngestManifestDbStore.
Classes:
| Name | Description |
|---|---|
IngestRepository |
Repository for ingest manifest state and canonical ingest rows. |
IngestRepository
Repository for ingest manifest state and canonical ingest rows.
Methods:
| Name | Description |
|---|---|
close |
Dispose pooled DB connections eagerly. |
ensure_act |
Resolve or create canonical act hub row. |
ensure_source_ref |
Resolve or create a source publication reference row. |
fetch_latest_missing_snapshot |
Fetch latest snapshot metadata and grouped interval rows. |
fetch_version_states |
Fetch nested |
find_source_ref_id |
Resolve a source publication reference id without creating rows. |
finish_run |
Update run status and finished timestamp. |
insert_missing_interval_rows_bulk_raw |
Insert normalized |
insert_missing_snapshot_meta_raw |
Insert one |
load_builder_state_v2 |
Load compact incremental builder state spill row. |
map_source_to_act |
Upsert resolved source->act mapping row. |
now_iso |
Return current UTC timestamp in ISO-8601. |
replace_act_build_rows_raw |
Replace build-dependent rowsets for one canonical build id. |
start_run |
Insert ingest run row and return run id. |
upsert_act_build_raw |
Upsert normalized |
upsert_act_progress_rows_bulk_raw |
Upsert normalized |
upsert_builder_state_raw |
Upsert one normalized |
upsert_variant_event_rows_bulk_raw |
Upsert normalized |
upsert_version_states_bulk_raw |
Upsert normalized |
Attributes:
| Name | Type | Description |
|---|---|---|
engine |
Engine
|
Expose SQLModel engine for low-level checks/migrations. |
Source code in src/law_graph/storage/repositories/ingest_repository.py
close
ensure_act
Resolve or create canonical act hub row.
Source code in src/law_graph/storage/repositories/ingest_repository.py
ensure_source_ref
ensure_source_ref(
*,
source: str,
owner_alias_kind: str,
owner_alias_value: str,
session: Session | None = None,
) -> int
Resolve or create a source publication reference row.
Source code in src/law_graph/storage/repositories/ingest_repository.py
fetch_latest_missing_snapshot
fetch_latest_missing_snapshot() -> tuple[
IngestMissingIntervalMeta | None,
dict[
tuple[str, str, str], list[IngestMissingIntervalRow]
],
]
Fetch latest snapshot metadata and grouped interval rows.
Source code in src/law_graph/storage/repositories/ingest_repository.py
fetch_version_states
fetch_version_states(
*, source: str, owner_alias_kind: str
) -> dict[str, dict[str, dict[str, str]]]
Fetch nested owner_alias_value -> version_tag -> source_hashes map.
Source code in src/law_graph/storage/repositories/ingest_repository.py
find_source_ref_id
find_source_ref_id(
*,
source: str,
owner_alias_kind: str,
owner_alias_value: str,
session: Session | None = None,
) -> int | None
Resolve a source publication reference id without creating rows.
Source code in src/law_graph/storage/repositories/ingest_repository.py
finish_run
Update run status and finished timestamp.
Source code in src/law_graph/storage/repositories/ingest_repository.py
insert_missing_interval_rows_bulk_raw
insert_missing_interval_rows_bulk_raw(
*,
rows: list[dict[str, Any]],
session: Session | None = None,
) -> None
Insert normalized ig_missing_interval rows.
Source code in src/law_graph/storage/repositories/ingest_repository.py
insert_missing_snapshot_meta_raw
insert_missing_snapshot_meta_raw(
*,
run_id: str,
generated_at: str,
acts_total: int,
acts_with_missing: int,
missing_intervals_total: int,
acts_expected: int | None,
complete: bool,
download_manifest_path: str,
download_manifest_sha256: str | None,
ingest_manifest_path: str | None,
ingest_manifest_sha256: str | None,
schema_version: str,
session: Session | None = None,
) -> int
Insert one ig_missing_interval_meta row and return meta_id.
Source code in src/law_graph/storage/repositories/ingest_repository.py
load_builder_state_v2
load_builder_state_v2(
*,
source: str,
owner_alias_kind: str,
owner_alias_value: str,
run_id: str,
) -> dict[str, Any] | None
Load compact incremental builder state spill row.
Source code in src/law_graph/storage/repositories/ingest_repository.py
map_source_to_act
map_source_to_act(
*,
source_ref_id: int,
act_id: int,
run_id: str | None,
confidence: float | None = None,
evidence_json: str | None = None,
session: Session | None = None,
) -> None
Upsert resolved source->act mapping row.
Source code in src/law_graph/storage/repositories/ingest_repository.py
now_iso
staticmethod
replace_act_build_rows_raw
replace_act_build_rows_raw(
*,
build_id: int,
urn_nir: str,
variant_diagnostic: IngestBuildVariantDiagnosticRow
| None,
node_intervals: list[IngestNodeIntervalRow],
content_blobs: list[IngestContentBlobRow],
metadata_deltas: list[IngestMetadataDeltaRow],
session: Session | None = None,
) -> None
Replace build-dependent rowsets for one canonical build id.
Source code in src/law_graph/storage/repositories/ingest_repository.py
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 | |
start_run
start_run(
*,
params: dict[str, Any],
download_manifest_path: Path,
download_manifest_sha256: str | None,
) -> str
Insert ingest run row and return run id.
Source code in src/law_graph/storage/repositories/ingest_repository.py
upsert_act_build_raw
upsert_act_build_raw(
*,
act_id: int,
version_tag: str,
run_id: str,
input_versions_json: str,
interval_node_count: int,
content_blob_count: int,
original_payloads_json: str | None,
session: Session | None = None,
) -> int
Upsert normalized ig_act_build row and return build_id.
Source code in src/law_graph/storage/repositories/ingest_repository.py
upsert_act_progress_rows_bulk_raw
upsert_act_progress_rows_bulk_raw(
*,
rows: list[dict[str, Any]],
session: Session | None = None,
) -> None
Upsert normalized ig_act_progress_v2 rows keyed by IDs.
Source code in src/law_graph/storage/repositories/ingest_repository.py
upsert_builder_state_raw
upsert_builder_state_raw(
*,
run_id: str,
source_ref_id: int,
state_json: str,
updated_at: str,
session: Session | None = None,
) -> None
Upsert one normalized ig_builder_state_v2 row.
Source code in src/law_graph/storage/repositories/ingest_repository.py
upsert_variant_event_rows_bulk_raw
upsert_variant_event_rows_bulk_raw(
*,
rows: list[dict[str, Any]],
session: Session | None = None,
) -> None
Upsert normalized ig_variant_event_v2 rows keyed by IDs.
Source code in src/law_graph/storage/repositories/ingest_repository.py
upsert_version_states_bulk_raw
upsert_version_states_bulk_raw(
*,
rows: list[dict[str, Any]],
session: Session | None = None,
) -> None
Upsert normalized ig_version_state rows keyed by resolved IDs.
Source code in src/law_graph/storage/repositories/ingest_repository.py
Download Repository
download_repository
SQL primitives and schema lifecycle for download manifest persistence
Classes:
| Name | Description |
|---|---|
DownloadRepository |
Repository owning schema setup and table-oriented download primitives. |
DownloadRepository
Repository owning schema setup and table-oriented download primitives.
Methods:
| Name | Description |
|---|---|
close |
Dispose pooled DB connections eagerly. |
count_artifact_items |
Count distinct artifact roots currently tracked in the manifest. |
count_missing_artifact_paths |
Count manifest file paths that are missing on disk under |
create_constraints |
Create schema indexes and compatibility tables required by the store. |
create_views |
Create and refresh SQL views used by coverage/reporting queries. |
fetch_artifact_rows |
Yield joined artifact rows used to assemble artifact inventories. |
fetch_coverage_rows |
Fetch denormalized act coverage rows for reporting and export views. |
get_any_path_for_sha256 |
Return one manifest path for a given blob hash, if present. |
scan_missing_artifact_paths |
Scan manifest file paths for on-disk presence. |
validate_schema_contract |
Validate required download-manifest columns and nullability constraints. |
Attributes:
| Name | Type | Description |
|---|---|---|
engine |
Engine
|
Return the SQLAlchemy engine backing this repository. |
Source code in src/law_graph/storage/repositories/download_repository.py
close
count_artifact_items
Count distinct artifact roots currently tracked in the manifest.
Source code in src/law_graph/storage/repositories/download_repository.py
count_missing_artifact_paths
Count manifest file paths that are missing on disk under root_dir.
Source code in src/law_graph/storage/repositories/download_repository.py
create_constraints
Create schema indexes and compatibility tables required by the store.
Source code in src/law_graph/storage/repositories/download_repository.py
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | |
create_views
Create and refresh SQL views used by coverage/reporting queries.
Source code in src/law_graph/storage/repositories/download_repository.py
fetch_artifact_rows
fetch_artifact_rows(
*, order_by_path: bool
) -> Iterator[
tuple[ActFile, ArtifactBlob, SourceRef | None]
]
Yield joined artifact rows used to assemble artifact inventories.
Source code in src/law_graph/storage/repositories/download_repository.py
fetch_coverage_rows
fetch_coverage_rows(
*,
owner_alias_kind: str | None,
urn_alias_kind: str,
shard_mod: int | None,
shard_rem: int | None,
) -> list[dict[str, Any]]
Fetch denormalized act coverage rows for reporting and export views.
Source code in src/law_graph/storage/repositories/download_repository.py
get_any_path_for_sha256
Return one manifest path for a given blob hash, if present.
Source code in src/law_graph/storage/repositories/download_repository.py
scan_missing_artifact_paths
Scan manifest file paths for on-disk presence.
Returns:
| Type | Description |
|---|---|
tuple[int, int]
|
A tuple of |
Source code in src/law_graph/storage/repositories/download_repository.py
validate_schema_contract
Validate required download-manifest columns and nullability constraints.