Skip to content

CLI API

cli

Typer CLI entry point.

Functions:

Name Description
download_ita

Download Normattiva datasets and update the inventory database.

ingest_ita

Ingest downloaded datasets into canonical SQLite outputs.

backfill_ita

Run backfill downloads using ingest-produced missing-interval hints.

storage_upload

Upload selected local directories to S3-compatible object storage.

storage_download

Download selected object storage prefixes into local directories.

storage_purge

Delete all objects under selected prefixes in object storage.

download_ita

download_ita(
    verbose: bool = Option(
        False,
        "--verbose",
        "-v",
        help="Enable verbose logging",
    ),
    output_dir: Path = Option(
        Path("exports/normattiva/ita"),
        "--output-dir",
        "-o",
        help="Directory for storing downloaded datasets.",
    ),
    overlap_days: int = Option(
        2,
        "--overlap-days",
        help="Number of days to overlap when computing incremental download windows.",
    ),
    skip_history_bulk: bool = Option(
        True,
        "--skip-history-bulk/--no-skip-history-bulk",
        help="Skip historical V/O bulk windows (older than multivigente start).",
    ),
    per_act_concurrency: int = Option(
        10,
        "--per-act-concurrency",
        help="Max parallel per-act exports (default 10).",
    ),
    per_act_export_modes: str = Option(
        "V,M,O",
        "--per-act-export-modes",
        help="Comma-separated export modes for per-act fallback. V=vigente, M=multivigente, O=originario. Order determines priority.",
    ),
    per_act_export_formats: str = Option(
        "akn,json,nir",
        "--per-act-export-formats",
        help="Comma-separated formats to download. Options: json,akn,nir. Default 'akn,json,nir' (nir runs after akn/json).",
    ),
    predefined_export_formats: str = Option(
        "akn,json,nir",
        "--predefined-export-formats",
        help="Comma-separated predefined collection export formats. Options: akn,json,nir.",
    ),
    predefined_vigenza_modes: str = Option(
        "O,M,V",
        "--predefined-vigenza-modes",
        help="Comma-separated predefined collection vigenza modes. Options: O,M,V.",
    ),
    shard_mod: int | None = Option(
        None,
        "--shard-mod",
        help="Shard modulus for deterministic sharding by act_id (e.g. 2 for two machines).",
    ),
    shard_rem: int | None = Option(
        None,
        "--shard-rem",
        help="Shard remainder for deterministic sharding by act_id (e.g. 0 or 1 when --shard-mod=2).",
    ),
    richiesta_export: str = Option(
        "M",
        "--richiesta-export",
        "-r",
        help="Async export tipo richiesta: 'M' multivigente, 'V' vigente, 'O' originario.",
    ),
    build_manifest: bool = Option(
        True,
        "--build-manifest/--no-build-manifest",
        help="Build or skip the act manifest enumeration step.",
    ),
    manifest_start_year: int = Option(
        1861,
        "--manifest-start-year",
        help="Start year for act manifest enumeration.",
    ),
    manifest_end_year: int | None = Option(
        None,
        "--manifest-end-year",
        help="Optional end year for act manifest enumeration.",
    ),
    manifest_year_window: int = Option(
        1,
        "--manifest-year-window",
        help="Year window size for act manifest enumeration.",
    ),
    manifest_year_window_concurrency: int = Option(
        1,
        "--manifest-window-concurrency",
        help="Max concurrent act manifest windows.",
    ),
    manifest_page_size: int = Option(
        200,
        "--manifest-page-size",
        help="Page size for act manifest enumeration.",
    ),
    manifest_max_pages: int | None = Option(
        None,
        "--manifest-max-pages",
        help="Optional page cap per manifest enumeration window.",
    ),
    manifest_page_delay_seconds: float = Option(
        0.5,
        "--manifest-page-delay",
        help="Delay (seconds) between manifest page requests.",
    ),
    manifest_window_delay_seconds: float = Option(
        1.0,
        "--manifest-window-delay",
        help="Delay (seconds) before each manifest window request.",
    ),
    manifest_request_retry_attempts: int = Option(
        3,
        "--manifest-request-retries",
        help="Retry attempts for manifest requests.",
    ),
    manifest_request_retry_sleep_seconds: float = Option(
        5.0,
        "--manifest-request-retry-sleep",
        help="Sleep (seconds) between manifest request retries.",
    ),
    log_file: Path | None = Option(
        None,
        "--log-file",
        help="Optional file path to write download logs.",
    ),
    manifest_db_path: Path | None = Option(
        None,
        "--manifest-db-path",
        help="Optional override path for `download_manifest.db` (defaults to output_dir/download_manifest.db).",
    ),
    dedupe_by_sha: bool = Option(
        True,
        "--dedupe-by-sha/--no-dedupe-by-sha",
        help="Skip writing duplicate artifacts when sha256 matches an existing one.",
    ),
) -> None

Download Normattiva datasets and update the inventory database.

Source code in src/law_graph/cli.py
@download_app.command("ita")
def download_ita(
    verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging"),
    output_dir: Path = typer.Option(
        Path("exports/normattiva/ita"),
        "--output-dir",
        "-o",
        help="Directory for storing downloaded datasets.",
    ),
    overlap_days: int = typer.Option(
        2,
        "--overlap-days",
        help="Number of days to overlap when computing incremental download windows.",
    ),
    skip_history_bulk: bool = typer.Option(
        True,
        "--skip-history-bulk/--no-skip-history-bulk",
        help="Skip historical V/O bulk windows (older than multivigente start).",
    ),
    per_act_concurrency: int = typer.Option(
        10,
        "--per-act-concurrency",
        help="Max parallel per-act exports (default 10).",
    ),
    per_act_export_modes: str = typer.Option(
        "V,M,O",
        "--per-act-export-modes",
        help="Comma-separated export modes for per-act fallback. V=vigente, M=multivigente, O=originario. Order determines priority.",
    ),
    per_act_export_formats: str = typer.Option(
        "akn,json,nir",
        "--per-act-export-formats",
        help="Comma-separated formats to download. Options: json,akn,nir. Default 'akn,json,nir' (nir runs after akn/json).",
    ),
    predefined_export_formats: str = typer.Option(
        "akn,json,nir",
        "--predefined-export-formats",
        help="Comma-separated predefined collection export formats. Options: akn,json,nir.",
    ),
    predefined_vigenza_modes: str = typer.Option(
        "O,M,V",
        "--predefined-vigenza-modes",
        help="Comma-separated predefined collection vigenza modes. Options: O,M,V.",
    ),
    shard_mod: int | None = typer.Option(
        None,
        "--shard-mod",
        help="Shard modulus for deterministic sharding by act_id (e.g. 2 for two machines).",
    ),
    shard_rem: int | None = typer.Option(
        None,
        "--shard-rem",
        help="Shard remainder for deterministic sharding by act_id (e.g. 0 or 1 when --shard-mod=2).",
    ),
    richiesta_export: str = typer.Option(
        "M",
        "--richiesta-export",
        "-r",
        help="Async export tipo richiesta: 'M' multivigente, 'V' vigente, 'O' originario.",
    ),
    build_manifest: bool = typer.Option(
        True,
        "--build-manifest/--no-build-manifest",
        help="Build or skip the act manifest enumeration step.",
    ),
    manifest_start_year: int = typer.Option(
        1861,
        "--manifest-start-year",
        help="Start year for act manifest enumeration.",
    ),
    manifest_end_year: int | None = typer.Option(
        None,
        "--manifest-end-year",
        help="Optional end year for act manifest enumeration.",
    ),
    manifest_year_window: int = typer.Option(
        1,
        "--manifest-year-window",
        help="Year window size for act manifest enumeration.",
    ),
    manifest_year_window_concurrency: int = typer.Option(
        1,
        "--manifest-window-concurrency",
        help="Max concurrent act manifest windows.",
    ),
    manifest_page_size: int = typer.Option(
        200,
        "--manifest-page-size",
        help="Page size for act manifest enumeration.",
    ),
    manifest_max_pages: int | None = typer.Option(
        None,
        "--manifest-max-pages",
        help="Optional page cap per manifest enumeration window.",
    ),
    manifest_page_delay_seconds: float = typer.Option(
        0.5,
        "--manifest-page-delay",
        help="Delay (seconds) between manifest page requests.",
    ),
    manifest_window_delay_seconds: float = typer.Option(
        1.0,
        "--manifest-window-delay",
        help="Delay (seconds) before each manifest window request.",
    ),
    manifest_request_retry_attempts: int = typer.Option(
        3,
        "--manifest-request-retries",
        help="Retry attempts for manifest requests.",
    ),
    manifest_request_retry_sleep_seconds: float = typer.Option(
        5.0,
        "--manifest-request-retry-sleep",
        help="Sleep (seconds) between manifest request retries.",
    ),
    log_file: Path | None = typer.Option(
        None,
        "--log-file",
        help="Optional file path to write download logs.",
    ),
    manifest_db_path: Path | None = typer.Option(
        None,
        "--manifest-db-path",
        help="Optional override path for `download_manifest.db` (defaults to output_dir/download_manifest.db).",
    ),
    dedupe_by_sha: bool = typer.Option(
        True,
        "--dedupe-by-sha/--no-dedupe-by-sha",
        help="Skip writing duplicate artifacts when sha256 matches an existing one.",
    ),
) -> None:
    """Download Normattiva datasets and update the inventory database."""
    """Download Normattiva payloads using the Italy downloader.

    Parameters
    ----------
    verbose:
        Enable debug logging when ``True``.
    output_dir:
        Directory where downloaded payloads and manifest are stored.
    overlap_days:
        Number of days to overlap when computing incremental windows.
    richiesta_export:
        Normattiva richiesta code (``M``, ``V`` or ``O``).
    log_file:
        Optional file where logs are persisted in addition to stdout.
    """

    level = "DEBUG" if verbose else "INFO"
    setup_logging(level=level, log_file=log_file, log_name="download")

    # Parse comma-separated modes and formats
    parsed_modes = tuple(m.strip().upper() for m in per_act_export_modes.split(",") if m.strip())
    parsed_formats = tuple(f.strip().lower() for f in per_act_export_formats.split(",") if f.strip())
    parsed_predefined_formats = tuple(f.strip().lower() for f in predefined_export_formats.split(",") if f.strip())
    parsed_predefined_modes = tuple(m.strip().upper() for m in predefined_vigenza_modes.split(",") if m.strip())
    if (shard_mod is None) != (shard_rem is None):
        raise typer.BadParameter("--shard-mod and --shard-rem must be provided together")
    if shard_mod is not None:
        if shard_mod <= 0:
            raise typer.BadParameter("--shard-mod must be > 0")
        if shard_rem is None or shard_rem < 0 or shard_rem >= shard_mod:
            raise typer.BadParameter("--shard-rem must satisfy 0 <= rem < mod")

    pipeline = get_origin_pipeline(OriginId.NORMATTIVA)
    result = pipeline.download(
        pipeline.download_config_type(
            output_dir=output_dir,
            build_manifest=build_manifest,
            manifest_start_year=manifest_start_year,
            manifest_end_year=manifest_end_year,
            manifest_year_window=manifest_year_window,
            manifest_year_window_concurrency=manifest_year_window_concurrency,
            manifest_page_size=manifest_page_size,
            manifest_max_pages=manifest_max_pages,
            manifest_page_delay_seconds=manifest_page_delay_seconds,
            manifest_window_delay_seconds=manifest_window_delay_seconds,
            manifest_request_retry_attempts=manifest_request_retry_attempts,
            manifest_request_retry_sleep_seconds=manifest_request_retry_sleep_seconds,
            overlap_days=overlap_days,
            richiesta_export=richiesta_export,
            per_act_concurrency=per_act_concurrency,
            per_act_export_modes=parsed_modes,
            per_act_export_formats=parsed_formats,
            predefined_export_formats=parsed_predefined_formats,
            predefined_vigenza_modes=parsed_predefined_modes,
            per_act_shard_mod=shard_mod,
            per_act_shard_rem=shard_rem,
            skip_history_bulk=skip_history_bulk,
            manifest_db_path=manifest_db_path,
            dedupe_by_sha=dedupe_by_sha,
            origin_source=pipeline.origin_id.value,
            owner_alias_kind=pipeline.alias_kinds.owner_alias_kind,
            log_file=log_file,
            verbose=verbose,
        )
    )
    log = get_logger(command="download")
    log.info(
        "Download completed inventory_db={inventory_db} act_manifest={act_manifest} per_act_attempts={attempts}",
        inventory_db=str(result.inventory_db_path),
        act_manifest=str(result.act_manifest_path),
        attempts=result.per_act_attempts,
    )

ingest_ita

ingest_ita(
    input_dir: Path = Option(
        Path("exports/normattiva/ita"),
        "--input-dir",
        "-i",
        help="Directory containing Normattiva exports and download_manifest.db.",
    ),
    output_dir: Path = Option(
        Path("exports/processed/ita"),
        "--output-dir",
        "-o",
        help="Output directory for canonical ingest state DB (ingest_manifest.db).",
    ),
    limit: int | None = Option(
        None,
        "--limit",
        "-n",
        help="Optional limit of acts to process (for quick local runs).",
    ),
    verbose: bool = Option(
        False,
        "--verbose",
        "-v",
        help="Enable verbose debug logging for ingestion.",
    ),
    log_file: Path | None = Option(
        None,
        "--log-file",
        help="Optional file path to write ingestion logs.",
    ),
    quiet: bool = Option(
        False,
        "--quiet",
        help="Reduce logging verbosity during ingestion (suppresses per-node INFO).",
    ),
    act_concurrency: int | None = Option(
        None,
        "--act-concurrency",
        help="Number of parallel acts processed concurrently. Defaults to a CPU-based heuristic.",
    ),
    act_batch_size: int | None = Option(
        None,
        "--act-batch-size",
        help="Maximum number of in-flight variant tasks queued globally.",
    ),
    max_inflight_variants_per_act: int | None = Option(
        None,
        "--max-inflight-variants-per-act",
        help="Cap for in-flight variants of a single act (default: auto == worker count).",
    ),
    memory_budget_mb: int | None = Option(
        None,
        "--memory-budget-mb",
        help="Approximate worker-memory budget in MB for ingestion scheduling decisions (default: auto, 80%% of available RAM).",
    ),
    act_max_tasks: int | None = Option(
        15,
        "--act-max-tasks",
        help="Maximum number of act tasks a worker handles before respawn (maxtasksperchild).",
    ),
    ingest_manifest_path: Path | None = Option(
        None,
        "--ingest-manifest",
        help="Path to the ingest manifest DB used for incremental builds.",
    ),
    missing_intervals_path: Path | None = Option(
        None,
        "--missing-intervals-path",
        help="Optional path to ingest manifest DB used for missing-interval snapshots (defaults to output_dir/ingest_manifest.db).",
    ),
    write_missing_intervals: bool = Option(
        True,
        "--write-missing-intervals/--no-write-missing-intervals",
        help="Enable or disable writing missing-interval hints (records persist across runs and are merged incrementally).",
    ),
    force_reparse: bool = Option(
        False,
        "--force-reparse",
        help="Process all acts even if unchanged and rebuild missing-interval hints from scratch.",
    ),
) -> None

Ingest downloaded datasets into canonical SQLite outputs.

Source code in src/law_graph/cli.py
@ingest_app.command("ita")
def ingest_ita(
    input_dir: Path = typer.Option(
        Path("exports/normattiva/ita"),
        "--input-dir",
        "-i",
        help="Directory containing Normattiva exports and download_manifest.db.",
    ),
    output_dir: Path = typer.Option(
        Path("exports/processed/ita"),
        "--output-dir",
        "-o",
        help="Output directory for canonical ingest state DB (ingest_manifest.db).",
    ),
    limit: int | None = typer.Option(
        None,
        "--limit",
        "-n",
        help="Optional limit of acts to process (for quick local runs).",
    ),
    verbose: bool = typer.Option(
        False,
        "--verbose",
        "-v",
        help="Enable verbose debug logging for ingestion.",
    ),
    log_file: Path | None = typer.Option(
        None,
        "--log-file",
        help="Optional file path to write ingestion logs.",
    ),
    quiet: bool = typer.Option(
        False,
        "--quiet",
        help="Reduce logging verbosity during ingestion (suppresses per-node INFO).",
    ),
    act_concurrency: int | None = typer.Option(
        None,
        "--act-concurrency",
        help="Number of parallel acts processed concurrently. Defaults to a CPU-based heuristic.",
    ),
    act_batch_size: int | None = typer.Option(
        None,
        "--act-batch-size",
        help="Maximum number of in-flight variant tasks queued globally.",
    ),
    max_inflight_variants_per_act: int | None = typer.Option(
        None,
        "--max-inflight-variants-per-act",
        help="Cap for in-flight variants of a single act (default: auto == worker count).",
    ),
    memory_budget_mb: int | None = typer.Option(
        None,
        "--memory-budget-mb",
        help="Approximate worker-memory budget in MB for ingestion scheduling decisions (default: auto, 80%% of available RAM).",
    ),
    act_max_tasks: int | None = typer.Option(
        15,
        "--act-max-tasks",
        help="Maximum number of act tasks a worker handles before respawn (maxtasksperchild).",
    ),
    ingest_manifest_path: Path | None = typer.Option(
        None,
        "--ingest-manifest",
        help="Path to the ingest manifest DB used for incremental builds.",
    ),
    missing_intervals_path: Path | None = typer.Option(
        None,
        "--missing-intervals-path",
        help="Optional path to ingest manifest DB used for missing-interval snapshots (defaults to output_dir/ingest_manifest.db).",
    ),
    write_missing_intervals: bool = typer.Option(
        True,
        "--write-missing-intervals/--no-write-missing-intervals",
        help="Enable or disable writing missing-interval hints (records persist across runs and are merged incrementally).",
    ),
    force_reparse: bool = typer.Option(
        False,
        "--force-reparse",
        help="Process all acts even if unchanged and rebuild missing-interval hints from scratch.",
    ),
) -> None:
    """Ingest downloaded datasets into canonical SQLite outputs."""
    """Convert Normattiva exports into base+delta artifacts.

    Parameters
    ----------
    input_dir:
        Directory containing downloaded payloads and ``download_manifest.db``.
    output_dir:
        Output directory holding ``ingest_manifest.db``.
    limit:
        Optional upper bound on processed manifest entries.
    verbose:
        Enable debug logging when ``True``.
    log_file:
        Optional log destination on disk.
    quiet:
        Suppress info-level logs when ``True``.
    act_concurrency:
        Number of parallel acts feeding the ingest pipeline.
    act_batch_size:
        Maximum number of queued variant tasks before waiting for
        workers to complete.
    max_inflight_variants_per_act:
        Maximum queued variants allowed for the same act.
        When omitted, defaults to worker count.
    memory_budget_mb:
        Worker-memory budget used by scheduler backpressure heuristics.
        When omitted, defaults to 80% of currently available RAM.
    act_max_tasks:
        Maximum number of tasks a worker handles before it is recycled.
    ingest_manifest_path:
        Optional path to the ingest manifest SQLite DB used for incremental
        builds.
    missing_intervals_path:
        Optional override for the ingest manifest DB path used by
        missing-interval snapshots.
    write_missing_intervals:
        Disable to skip emitting missing-interval hints.
    """

    level = "DEBUG" if verbose else ("WARNING" if quiet else "INFO")
    setup_logging(level=level, file_level=level, log_file=log_file, log_name="ingest")
    log = get_logger(command="ingest")
    if act_max_tasks is not None and act_max_tasks <= 0:
        raise typer.BadParameter("--act-max-tasks must be a positive integer", param_hint="--act-max-tasks")
    pipeline = get_origin_pipeline(OriginId.NORMATTIVA)
    result = pipeline.ingest(
        pipeline.ingest_config_type(
            input_dir=input_dir,
            output_dir=output_dir,
            limit=limit,
            quiet=quiet,
            verbose=verbose,
            act_concurrency=act_concurrency,
            act_batch_size=act_batch_size,
            max_inflight_variants_per_act=max_inflight_variants_per_act,
            memory_budget_mb=memory_budget_mb,
            act_max_tasks=act_max_tasks,
            ingest_manifest_path=ingest_manifest_path,
            missing_intervals_path=missing_intervals_path,
            write_missing_intervals=write_missing_intervals,
            force_reparse=force_reparse,
            origin_source=pipeline.origin_id.value,
            owner_alias_kind=pipeline.alias_kinds.owner_alias_kind,
        )
    )
    combo_summary = ", ".join(
        f"{'+'.join(sorted(combo)) or 'none'}:{count}"
        for combo, count in sorted(result.combo_counts.items(), key=lambda item: (-item[1], sorted(item[0])))
    )
    log.info(
        "Ingest completed written={written} output_dir={output_dir} versions={versions} "
        "combos={combos} mismatched_items={mismatched} skipped_versions={skipped} missing_intervals={missing}",
        written=result.written,
        output_dir=str(output_dir),
        versions=result.versions_total,
        combos=combo_summary or "none",
        mismatched=result.items_with_mismatch,
        skipped=result.skipped_versions,
        missing=result.missing_intervals_total,
    )

backfill_ita

backfill_ita(
    input_dir: Path = Option(
        Path("exports/normattiva/ita"),
        "--input-dir",
        "-i",
        help="Directory containing Normattiva exports and download_manifest.db (backfill writes into this tree).",
    ),
    dry_run: bool = Option(
        False,
        "--dry-run",
        help="Plan queries but do not call Normattiva or write files.",
    ),
    limit_items: int | None = Option(
        None,
        "--limit-items",
        help="Optional cap on how many acts to scan.",
    ),
    max_queries_per_item: int | None = Option(
        None,
        "--max-queries-per-item",
        help="Maximum number of vigenza dates to query per act (default: unlimited).",
    ),
    max_total_queries: int | None = Option(
        None,
        "--max-total-queries",
        help="Maximum number of (date,format) queries across the whole run (default: unlimited).",
    ),
    concurrency: int = Option(
        2,
        "--concurrency",
        "-c",
        help="Maximum number of concurrent in-flight Normattiva requests.",
    ),
    min_start_interval_seconds: float = Option(
        1.0,
        "--min-start-interval-seconds",
        help="Minimum delay between starting async searches.",
    ),
    open_interval_offset_days: int = Option(
        30,
        "--open-interval-offset-days",
        help="Days to offset from interval start when end date is open.",
    ),
    cache_path: Path = Option(
        Path("backfill_cache.jsonl"),
        "--cache-path",
        help="Path to the persistent cache index (relative paths resolve under the input directory).",
    ),
    cache_dir: Path = Option(
        Path("backfill_cache"),
        "--cache-dir",
        help="Directory where cached vigente bundles are stored (relative paths resolve under the input directory).",
    ),
    cache_max_bytes: int | None = Option(
        2 * 1024 * 1024 * 1024,
        "--cache-max-bytes",
        help="Max total bytes for cached bundles; set to 0 to disable pruning.",
    ),
    disable_cache: bool = Option(
        False,
        "--disable-cache",
        help="Disable reading and writing the persistent cache (debugging aid).",
    ),
    missing_intervals_path: Path | None = Option(
        None,
        "--missing-intervals-path",
        help="Ingest manifest DB emitted by ingest (required; defaults to processed/ingest_manifest.db when found).",
    ),
    verbose: bool = Option(
        False,
        "--verbose",
        "-v",
        help="Enable verbose logging",
    ),
    log_file: Path | None = Option(
        None,
        "--log-file",
        help="Optional file path to write backfill logs.",
    ),
) -> None

Run backfill downloads using ingest-produced missing-interval hints.

Source code in src/law_graph/cli.py
@backfill_app.command("ita")
def backfill_ita(
    input_dir: Path = typer.Option(
        Path("exports/normattiva/ita"),
        "--input-dir",
        "-i",
        help="Directory containing Normattiva exports and download_manifest.db (backfill writes into this tree).",
    ),
    dry_run: bool = typer.Option(
        False,
        "--dry-run",
        help="Plan queries but do not call Normattiva or write files.",
    ),
    limit_items: int | None = typer.Option(
        None,
        "--limit-items",
        help="Optional cap on how many acts to scan.",
    ),
    max_queries_per_item: int | None = typer.Option(
        None,
        "--max-queries-per-item",
        help="Maximum number of vigenza dates to query per act (default: unlimited).",
    ),
    max_total_queries: int | None = typer.Option(
        None,
        "--max-total-queries",
        help="Maximum number of (date,format) queries across the whole run (default: unlimited).",
    ),
    concurrency: int = typer.Option(
        2,
        "--concurrency",
        "-c",
        help="Maximum number of concurrent in-flight Normattiva requests.",
    ),
    min_start_interval_seconds: float = typer.Option(
        1.0,
        "--min-start-interval-seconds",
        help="Minimum delay between starting async searches.",
    ),
    open_interval_offset_days: int = typer.Option(
        30,
        "--open-interval-offset-days",
        help="Days to offset from interval start when end date is open.",
    ),
    cache_path: Path = typer.Option(
        Path("backfill_cache.jsonl"),
        "--cache-path",
        help="Path to the persistent cache index (relative paths resolve under the input directory).",
    ),
    cache_dir: Path = typer.Option(
        Path("backfill_cache"),
        "--cache-dir",
        help="Directory where cached vigente bundles are stored (relative paths resolve under the input directory).",
    ),
    cache_max_bytes: int | None = typer.Option(
        2 * 1024 * 1024 * 1024,
        "--cache-max-bytes",
        help="Max total bytes for cached bundles; set to 0 to disable pruning.",
    ),
    disable_cache: bool = typer.Option(
        False,
        "--disable-cache",
        help="Disable reading and writing the persistent cache (debugging aid).",
    ),
    missing_intervals_path: Path | None = typer.Option(
        None,
        "--missing-intervals-path",
        help="Ingest manifest DB emitted by ingest (required; defaults to processed/ingest_manifest.db when found).",
    ),
    verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging"),
    log_file: Path | None = typer.Option(None, "--log-file", help="Optional file path to write backfill logs."),
) -> None:
    """Run backfill downloads using ingest-produced missing-interval hints."""
    """Backfill missing validity intervals using vigente + dataVigenza."""

    level = "DEBUG" if verbose else "INFO"
    setup_logging(level=level, log_file=log_file, log_name="backfill")
    log = get_logger(command="backfill")
    default_processed_dir = input_dir.parent.parent / "processed" / input_dir.name
    default_missing_intervals_db = default_processed_dir / INGEST_MANIFEST_DB_FILENAME
    if missing_intervals_path is None and default_missing_intervals_db.exists():
        missing_intervals_path = default_missing_intervals_db
    try:
        pipeline = get_origin_pipeline(OriginId.NORMATTIVA)
        result = pipeline.backfill(
            pipeline.backfill_config_type(
                input_dir=input_dir,
                dry_run=dry_run,
                limit_items=limit_items,
                max_queries_per_item=max_queries_per_item,
                max_total_queries=max_total_queries,
                concurrency=concurrency,
                min_start_interval_seconds=min_start_interval_seconds,
                open_interval_offset_days=open_interval_offset_days,
                cache_path=cache_path,
                cache_dir=cache_dir,
                disable_cache=disable_cache,
                cache_max_bytes=cache_max_bytes,
                missing_intervals_path=missing_intervals_path,
                origin_source=pipeline.origin_id.value,
                owner_alias_kind=pipeline.alias_kinds.owner_alias_kind,
            )
        )
    except ClientError as exc:
        log.error("{error}", error=str(exc))
        raise typer.Exit(1) from exc
    log.info(
        "Backfill completed dry_run={dry_run} items_scanned={items_scanned} items_with_missing={items_with_missing} "
        "requests_planned={requests_planned} requests_deduped={requests_deduped} cache_hits={cache_hits} "
        "requests_executed={requests_executed} files_written={files_written}",
        dry_run=dry_run,
        items_scanned=result.items_scanned,
        items_with_missing=result.items_with_missing,
        requests_planned=result.requests_planned,
        requests_deduped=result.requests_deduped,
        cache_hits=result.requests_cache_hits,
        requests_executed=result.requests_executed,
        files_written=result.files_written,
    )

storage_upload

storage_upload(
    bucket: str | None = Option(
        None,
        "--bucket",
        help="Target object storage bucket (env: LAW_GRAPH_S3_BUCKET).",
    ),
    endpoint_url: str | None = Option(
        None,
        "--endpoint-url",
        help="S3-compatible endpoint URL (env: LAW_GRAPH_S3_ENDPOINT_URL).",
    ),
    region: str | None = Option(
        None,
        "--region",
        help="Bucket region (env: LAW_GRAPH_S3_REGION).",
    ),
    access_key_id: str | None = Option(
        None,
        "--access-key-id",
        help="Access key ID (env: LAW_GRAPH_S3_ACCESS_KEY_ID).",
    ),
    secret_access_key: str | None = Option(
        None,
        "--secret-access-key",
        help="Secret access key (env: LAW_GRAPH_S3_SECRET_ACCESS_KEY).",
    ),
    session_token: str | None = Option(
        None,
        "--session-token",
        help="Session token (env: LAW_GRAPH_S3_SESSION_TOKEN).",
    ),
    max_pool_connections: int | None = Option(
        None,
        "--max-pool-connections",
        min=1,
        help="Max S3 client connection pool size (env: LAW_GRAPH_S3_MAX_POOL_CONNECTIONS).",
    ),
    root_dir: Path = Option(
        Path("."),
        "--root-dir",
        help="Local root directory used to resolve --dir paths and S3 object keys.",
    ),
    directories: list[Path] | None = Option(
        None,
        "--dir",
        help="Directory to upload; repeat for multiple directories.",
    ),
    delete_remote_missing: bool = Option(
        True,
        "--delete-remote-missing/--no-delete-remote-missing",
        help="Delete remote objects under selected prefixes that are absent locally.",
    ),
    skip_existing_non_db: bool = Option(
        True,
        "--skip-existing-non-db/--no-skip-existing-non-db",
        help="Skip re-upload for existing non-DB files.",
    ),
    max_workers: int = Option(
        DEFAULT_UPLOAD_WORKERS,
        "--max-workers",
        min=1,
        help="Max parallel upload workers.",
    ),
    verbose: bool = Option(
        False,
        "--verbose",
        "-v",
        help="Enable verbose logging",
    ),
    log_file: Path | None = Option(
        None,
        "--log-file",
        help="Optional file path to write upload logs.",
    ),
) -> None

Upload selected local directories to S3-compatible object storage.

Source code in src/law_graph/cli.py
@storage_app.command("upload")
def storage_upload(
    bucket: str | None = typer.Option(
        None,
        "--bucket",
        help="Target object storage bucket (env: LAW_GRAPH_S3_BUCKET).",
    ),
    endpoint_url: str | None = typer.Option(
        None,
        "--endpoint-url",
        help="S3-compatible endpoint URL (env: LAW_GRAPH_S3_ENDPOINT_URL).",
    ),
    region: str | None = typer.Option(
        None,
        "--region",
        help="Bucket region (env: LAW_GRAPH_S3_REGION).",
    ),
    access_key_id: str | None = typer.Option(
        None,
        "--access-key-id",
        help="Access key ID (env: LAW_GRAPH_S3_ACCESS_KEY_ID).",
    ),
    secret_access_key: str | None = typer.Option(
        None,
        "--secret-access-key",
        help="Secret access key (env: LAW_GRAPH_S3_SECRET_ACCESS_KEY).",
    ),
    session_token: str | None = typer.Option(
        None,
        "--session-token",
        help="Session token (env: LAW_GRAPH_S3_SESSION_TOKEN).",
    ),
    max_pool_connections: int | None = typer.Option(
        None,
        "--max-pool-connections",
        min=1,
        help="Max S3 client connection pool size (env: LAW_GRAPH_S3_MAX_POOL_CONNECTIONS).",
    ),
    root_dir: Path = typer.Option(
        Path("."),
        "--root-dir",
        help="Local root directory used to resolve --dir paths and S3 object keys.",
    ),
    directories: list[Path] | None = typer.Option(
        None,
        "--dir",
        help="Directory to upload; repeat for multiple directories.",
    ),
    delete_remote_missing: bool = typer.Option(
        True,
        "--delete-remote-missing/--no-delete-remote-missing",
        help="Delete remote objects under selected prefixes that are absent locally.",
    ),
    skip_existing_non_db: bool = typer.Option(
        True,
        "--skip-existing-non-db/--no-skip-existing-non-db",
        help="Skip re-upload for existing non-DB files.",
    ),
    max_workers: int = typer.Option(
        DEFAULT_UPLOAD_WORKERS,
        "--max-workers",
        min=1,
        help="Max parallel upload workers.",
    ),
    verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging"),
    log_file: Path | None = typer.Option(None, "--log-file", help="Optional file path to write upload logs."),
) -> None:
    """Upload selected local directories to S3-compatible object storage."""

    load_dotenv()
    level = "DEBUG" if verbose else "INFO"
    setup_logging(level=level, log_file=log_file, log_name="storage-upload")
    config = _build_s3_config(
        bucket=bucket,
        endpoint_url=endpoint_url,
        region=region,
        access_key_id=access_key_id,
        secret_access_key=secret_access_key,
        session_token=session_token,
        max_pool_connections=max_pool_connections,
    )
    selected_directories = tuple(directories or DEFAULT_TRANSFER_DIRECTORIES)
    client = build_s3_client(config)
    log = get_logger(command="storage.upload")
    summary = upload_directories(
        client=client,
        config=config,
        root_dir=root_dir,
        directories=selected_directories,
        delete_remote_missing=delete_remote_missing,
        skip_existing_non_db=skip_existing_non_db,
        max_workers=max_workers,
    )
    log.info(
        "Storage upload completed bucket={bucket} scanned={scanned} uploaded={uploaded} skipped={skipped} deleted={deleted}",
        bucket=config.bucket,
        scanned=summary.scanned,
        uploaded=summary.uploaded,
        skipped=summary.skipped,
        deleted=summary.deleted,
    )

storage_download

storage_download(
    bucket: str | None = Option(
        None,
        "--bucket",
        help="Source object storage bucket (env: LAW_GRAPH_S3_BUCKET).",
    ),
    endpoint_url: str | None = Option(
        None,
        "--endpoint-url",
        help="S3-compatible endpoint URL (env: LAW_GRAPH_S3_ENDPOINT_URL).",
    ),
    region: str | None = Option(
        None,
        "--region",
        help="Bucket region (env: LAW_GRAPH_S3_REGION).",
    ),
    access_key_id: str | None = Option(
        None,
        "--access-key-id",
        help="Access key ID (env: LAW_GRAPH_S3_ACCESS_KEY_ID).",
    ),
    secret_access_key: str | None = Option(
        None,
        "--secret-access-key",
        help="Secret access key (env: LAW_GRAPH_S3_SECRET_ACCESS_KEY).",
    ),
    session_token: str | None = Option(
        None,
        "--session-token",
        help="Session token (env: LAW_GRAPH_S3_SESSION_TOKEN).",
    ),
    max_pool_connections: int | None = Option(
        None,
        "--max-pool-connections",
        min=1,
        help="Max S3 client connection pool size (env: LAW_GRAPH_S3_MAX_POOL_CONNECTIONS).",
    ),
    root_dir: Path = Option(
        Path("."),
        "--root-dir",
        help="Local root directory where downloaded objects are written.",
    ),
    prefixes: list[str] | None = Option(
        None,
        "--prefix",
        help="Object key prefix to download; repeat for multiple prefixes.",
    ),
    db_only: bool = Option(
        False,
        "--db-only/--all-files",
        help="Download only database files instead of all matching objects.",
    ),
    skip_existing_non_db: bool = Option(
        True,
        "--skip-existing-non-db/--no-skip-existing-non-db",
        help="Skip downloading when local non-DB files already exist.",
    ),
    max_workers: int = Option(
        DEFAULT_UPLOAD_WORKERS,
        "--max-workers",
        min=1,
        help="Max parallel download workers.",
    ),
    verbose: bool = Option(
        False,
        "--verbose",
        "-v",
        help="Enable verbose logging",
    ),
    log_file: Path | None = Option(
        None,
        "--log-file",
        help="Optional file path to write download logs.",
    ),
) -> None

Download selected object storage prefixes into local directories.

Source code in src/law_graph/cli.py
@storage_app.command("download")
def storage_download(
    bucket: str | None = typer.Option(
        None,
        "--bucket",
        help="Source object storage bucket (env: LAW_GRAPH_S3_BUCKET).",
    ),
    endpoint_url: str | None = typer.Option(
        None,
        "--endpoint-url",
        help="S3-compatible endpoint URL (env: LAW_GRAPH_S3_ENDPOINT_URL).",
    ),
    region: str | None = typer.Option(
        None,
        "--region",
        help="Bucket region (env: LAW_GRAPH_S3_REGION).",
    ),
    access_key_id: str | None = typer.Option(
        None,
        "--access-key-id",
        help="Access key ID (env: LAW_GRAPH_S3_ACCESS_KEY_ID).",
    ),
    secret_access_key: str | None = typer.Option(
        None,
        "--secret-access-key",
        help="Secret access key (env: LAW_GRAPH_S3_SECRET_ACCESS_KEY).",
    ),
    session_token: str | None = typer.Option(
        None,
        "--session-token",
        help="Session token (env: LAW_GRAPH_S3_SESSION_TOKEN).",
    ),
    max_pool_connections: int | None = typer.Option(
        None,
        "--max-pool-connections",
        min=1,
        help="Max S3 client connection pool size (env: LAW_GRAPH_S3_MAX_POOL_CONNECTIONS).",
    ),
    root_dir: Path = typer.Option(
        Path("."),
        "--root-dir",
        help="Local root directory where downloaded objects are written.",
    ),
    prefixes: list[str] | None = typer.Option(
        None,
        "--prefix",
        help="Object key prefix to download; repeat for multiple prefixes.",
    ),
    db_only: bool = typer.Option(
        False,
        "--db-only/--all-files",
        help="Download only database files instead of all matching objects.",
    ),
    skip_existing_non_db: bool = typer.Option(
        True,
        "--skip-existing-non-db/--no-skip-existing-non-db",
        help="Skip downloading when local non-DB files already exist.",
    ),
    max_workers: int = typer.Option(
        DEFAULT_UPLOAD_WORKERS,
        "--max-workers",
        min=1,
        help="Max parallel download workers.",
    ),
    verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging"),
    log_file: Path | None = typer.Option(None, "--log-file", help="Optional file path to write download logs."),
) -> None:
    """Download selected object storage prefixes into local directories."""

    load_dotenv()
    level = "DEBUG" if verbose else "INFO"
    setup_logging(level=level, log_file=log_file, log_name="storage-download")
    config = _build_s3_config(
        bucket=bucket,
        endpoint_url=endpoint_url,
        region=region,
        access_key_id=access_key_id,
        secret_access_key=secret_access_key,
        session_token=session_token,
        max_pool_connections=max_pool_connections,
    )
    selected_prefixes = tuple(prefixes or DEFAULT_TRANSFER_PREFIXES)
    client = build_s3_client(config)
    log = get_logger(command="storage.download")
    summary = download_prefixes(
        client=client,
        config=config,
        root_dir=root_dir,
        prefixes=selected_prefixes,
        db_only=db_only,
        skip_existing_non_db=skip_existing_non_db,
        max_workers=max_workers,
    )
    log.info(
        "Storage download completed bucket={bucket} discovered={discovered} downloaded={downloaded} skipped={skipped} db_only={db_only}",
        bucket=config.bucket,
        discovered=summary.scanned,
        downloaded=summary.downloaded,
        skipped=summary.skipped,
        db_only=db_only,
    )

storage_purge

storage_purge(
    bucket: str | None = Option(
        None,
        "--bucket",
        help="Target object storage bucket (env: LAW_GRAPH_S3_BUCKET).",
    ),
    endpoint_url: str | None = Option(
        None,
        "--endpoint-url",
        help="S3-compatible endpoint URL (env: LAW_GRAPH_S3_ENDPOINT_URL).",
    ),
    region: str | None = Option(
        None,
        "--region",
        help="Bucket region (env: LAW_GRAPH_S3_REGION).",
    ),
    access_key_id: str | None = Option(
        None,
        "--access-key-id",
        help="Access key ID (env: LAW_GRAPH_S3_ACCESS_KEY_ID).",
    ),
    secret_access_key: str | None = Option(
        None,
        "--secret-access-key",
        help="Secret access key (env: LAW_GRAPH_S3_SECRET_ACCESS_KEY).",
    ),
    session_token: str | None = Option(
        None,
        "--session-token",
        help="Session token (env: LAW_GRAPH_S3_SESSION_TOKEN).",
    ),
    max_pool_connections: int | None = Option(
        None,
        "--max-pool-connections",
        min=1,
        help="Max S3 client connection pool size (env: LAW_GRAPH_S3_MAX_POOL_CONNECTIONS).",
    ),
    prefixes: list[str] | None = Option(
        None,
        "--prefix",
        help="Only delete keys under these prefixes; repeat for multiple prefixes.",
    ),
    yes: bool = Option(
        False,
        "--yes",
        help="Required confirmation flag to execute deletion.",
    ),
    verbose: bool = Option(
        False,
        "--verbose",
        "-v",
        help="Enable verbose logging",
    ),
    log_file: Path | None = Option(
        None,
        "--log-file",
        help="Optional file path to write purge logs.",
    ),
) -> None

Delete all objects under selected prefixes in object storage.

Source code in src/law_graph/cli.py
@storage_app.command("purge")
def storage_purge(
    bucket: str | None = typer.Option(
        None,
        "--bucket",
        help="Target object storage bucket (env: LAW_GRAPH_S3_BUCKET).",
    ),
    endpoint_url: str | None = typer.Option(
        None,
        "--endpoint-url",
        help="S3-compatible endpoint URL (env: LAW_GRAPH_S3_ENDPOINT_URL).",
    ),
    region: str | None = typer.Option(
        None,
        "--region",
        help="Bucket region (env: LAW_GRAPH_S3_REGION).",
    ),
    access_key_id: str | None = typer.Option(
        None,
        "--access-key-id",
        help="Access key ID (env: LAW_GRAPH_S3_ACCESS_KEY_ID).",
    ),
    secret_access_key: str | None = typer.Option(
        None,
        "--secret-access-key",
        help="Secret access key (env: LAW_GRAPH_S3_SECRET_ACCESS_KEY).",
    ),
    session_token: str | None = typer.Option(
        None,
        "--session-token",
        help="Session token (env: LAW_GRAPH_S3_SESSION_TOKEN).",
    ),
    max_pool_connections: int | None = typer.Option(
        None,
        "--max-pool-connections",
        min=1,
        help="Max S3 client connection pool size (env: LAW_GRAPH_S3_MAX_POOL_CONNECTIONS).",
    ),
    prefixes: list[str] | None = typer.Option(
        None,
        "--prefix",
        help="Only delete keys under these prefixes; repeat for multiple prefixes.",
    ),
    yes: bool = typer.Option(
        False,
        "--yes",
        help="Required confirmation flag to execute deletion.",
    ),
    verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging"),
    log_file: Path | None = typer.Option(None, "--log-file", help="Optional file path to write purge logs."),
) -> None:
    """Delete all objects under selected prefixes in object storage."""

    if not yes:
        raise typer.BadParameter("Pass --yes to confirm deleting remote objects.", param_hint="--yes")

    load_dotenv()
    level = "DEBUG" if verbose else "INFO"
    setup_logging(level=level, log_file=log_file, log_name="storage-purge")
    config = _build_s3_config(
        bucket=bucket,
        endpoint_url=endpoint_url,
        region=region,
        access_key_id=access_key_id,
        secret_access_key=secret_access_key,
        session_token=session_token,
        max_pool_connections=max_pool_connections,
    )
    selected_prefixes = tuple(prefixes or ("",))
    client = build_s3_client(config)
    log = get_logger(command="storage.purge")
    deleted_count = purge_bucket_prefixes(
        client=client,
        config=config,
        prefixes=selected_prefixes,
    )
    log.info(
        "Storage purge completed bucket={bucket} deleted={deleted}",
        bucket=config.bucket,
        deleted=deleted_count,
    )