def run_ingest(config: IngestConfig) -> IngestResult:
"""Convert raw payloads into canonical ingest rows using incremental apply/finalize."""
log = get_logger(command="ingest")
profile_dir = _profile_dir()
parent_profiler: cProfile.Profile | None = cProfile.Profile() if profile_dir is not None else None
parent_rss_start_kb = _current_process_rss_kb()
parent_started_tracemalloc = False
if profile_dir is not None:
if not tracemalloc.is_tracing():
tracemalloc.start(25)
parent_started_tracemalloc = True
if parent_profiler is not None:
parent_profiler.enable()
inventory_db_path = config.input_dir / DOWNLOAD_MANIFEST_DB_FILENAME
inventory_store = DownloadManifestDbStore(inventory_db_path)
inventory_store.assert_artifact_paths_exist(root_dir=config.input_dir)
total_manifest_entries = inventory_store.count_artifact_items()
if total_manifest_entries <= 0:
raise ValueError(f"No artifact inventory entries found in {inventory_db_path}")
resolved_memory_budget_mb = (
max(1, int(config.memory_budget_mb)) if config.memory_budget_mb is not None else _default_memory_budget_mb()
)
log.info(
"Starting ingest run",
input_dir=str(config.input_dir),
output_dir=str(config.output_dir),
total_manifest_entries=total_manifest_entries,
force_reparse=config.force_reparse,
limit=config.limit,
act_concurrency=config.act_concurrency,
memory_budget_mb=resolved_memory_budget_mb,
)
config.output_dir.mkdir(parents=True, exist_ok=True)
ingest_manifest_path = config.ingest_manifest_path or (config.output_dir / INGEST_MANIFEST_DB_FILENAME)
ingest_manifest_store = IngestManifestDbStore(ingest_manifest_path)
existing_versions_by_eli_id = ingest_manifest_store.load_version_states(
source=config.origin_source,
owner_alias_kind=config.owner_alias_kind,
)
download_digest = build_manifest_digest(inventory_db_path, None)
def _start_run() -> str:
return ingest_manifest_store.start_run(
params={
"limit": config.limit,
"force_reparse": config.force_reparse,
"include_raw_payloads": config.include_raw_payloads,
"act_concurrency": config.act_concurrency,
"act_batch_size": config.act_batch_size,
"max_inflight_variants_per_act": config.max_inflight_variants_per_act,
"memory_budget_mb": resolved_memory_budget_mb,
"act_max_tasks": config.act_max_tasks,
"write_missing_intervals": config.write_missing_intervals,
},
download_manifest_path=inventory_db_path,
download_manifest_sha256=download_digest.sha256,
)
existing_hint_records: dict[SourceOwnerAliasKey, list[dict[str, Any]]] = {}
hint_updates: dict[SourceOwnerAliasKey, list[dict[str, Any]]] = {}
hints_snapshot_preexisting = False
existing_hint_metadata: MissingSnapshotMetadata | None = None
if config.write_missing_intervals:
existing_hint_metadata, existing_hint_records = ingest_manifest_store.load_latest_missing_snapshot()
hints_snapshot_preexisting = existing_hint_metadata is not None
written = 0
acts_processed = 0
acts_expected_total = 0
manifest_entries_completed = 0
versions_total = 0
skipped_versions = 0
combo_counts: Counter[frozenset[str]] = Counter()
skip_stats: Counter[str] = Counter()
items_with_mismatch = 0
missing_intervals_total = 0
lifecycle_cm = sync_run_lifecycle(
start_run=_start_run,
finish_run=lambda state: ingest_manifest_store.finish_run(
run_id=state.run_id,
status=state.run_status,
skip_stats=dict(skip_stats),
),
)
lifecycle_state = lifecycle_cm.__enter__()
run_id = lifecycle_state.run_id
act_worker_count = _default_workers(config.act_concurrency)
worker_log_level = "INFO" if config.verbose else "WARNING"
requested_batch = config.act_batch_size or act_worker_count
batch_size = max(act_worker_count, requested_batch, 1)
if config.max_inflight_variants_per_act is None:
per_act_inflight_limit = _default_per_act_inflight_limit(act_worker_count)
else:
per_act_inflight_limit = max(1, int(config.max_inflight_variants_per_act))
pool_max_tasks = config.act_max_tasks if (config.act_max_tasks is None or config.act_max_tasks > 0) else None
memory_budget_kb = resolved_memory_budget_mb * 1024
start_time = perf_counter()
aggregations: dict[str, _ActAggregation] = {}
log_limiter = RateLimitedLogger(log=log)
last_progress_log_at = start_time
eta_seconds_ema: float | None = None
eta_smoothing_alpha = 0.15
progress_log_every_seconds = 5.0 if config.verbose else 30.0
progress_log_every_acts = 50 if config.verbose else 500
manifest_entries_target = (
min(config.limit, total_manifest_entries) if config.limit is not None else total_manifest_entries
)
worker_pool_rss_live_kb = 0
estimated_inflight_worker_kb = 0
observed_worker_rss_kb_by_manifest: dict[str, int] = {}
def _pending_variants() -> int:
return sum(sum(agg.pending_variants_by_version.values()) for agg in aggregations.values())
def _maybe_log_progress(*, force: bool = False) -> None:
nonlocal eta_seconds_ema, last_progress_log_at
now = perf_counter()
should_log = force
if not should_log and manifest_entries_completed > 0:
if manifest_entries_completed % progress_log_every_acts == 0:
should_log = True
elif (now - last_progress_log_at) >= progress_log_every_seconds:
should_log = True
if not should_log:
return
elapsed = now - start_time
written_rate = (written / elapsed) if elapsed > 0 else 0.0
processed_rate = (acts_processed / elapsed) if elapsed > 0 else 0.0
completion_rate = (manifest_entries_completed / elapsed) if elapsed > 0 else 0.0
remaining_entries = max(0, manifest_entries_target - manifest_entries_completed)
eta_seconds_raw: float | None = None
if completion_rate > 0:
eta_seconds_raw = remaining_entries / completion_rate
if eta_seconds_raw is None:
eta_seconds_ema = None
elif eta_seconds_ema is None:
eta_seconds_ema = eta_seconds_raw
else:
# Smooth ETA to reduce jitter from short-term throughput variance.
eta_seconds_ema = (eta_smoothing_alpha * eta_seconds_raw) + ((1.0 - eta_smoothing_alpha) * eta_seconds_ema)
parent_rss_kb = _current_process_rss_kb()
scheduler_committed_kb = _scheduler_committed_memory_kb(
worker_pool_rss_kb=worker_pool_rss_live_kb,
estimated_inflight_kb=estimated_inflight_worker_kb,
)
log.info(
"Ingest progress",
acts_processed=acts_processed,
acts_expected=acts_expected_total,
manifest_entries_completed=manifest_entries_completed,
manifest_entries_target=manifest_entries_target,
total_manifest_entries=total_manifest_entries,
written=written,
versions_total=versions_total,
pending_acts=len(aggregations),
pending_variants=_pending_variants(),
elapsed_seconds=round(elapsed, 2),
acts_per_second=round(written_rate, 2),
processed_acts_per_second=round(processed_rate, 2),
eta_seconds=(round(eta_seconds_ema, 2) if eta_seconds_ema is not None else None),
eta_hms=(_format_duration(eta_seconds_ema) if eta_seconds_ema is not None else None),
parent_rss_mb=round(parent_rss_kb / 1024, 1),
worker_pool_rss_mb=round(worker_pool_rss_live_kb / 1024, 1),
memory_budget_mb=round(memory_budget_kb / 1024, 1),
scheduler_memory_committed_mb=round(scheduler_committed_kb / 1024, 1),
scheduler_total_observed_mb=round((parent_rss_kb + worker_pool_rss_live_kb) / 1024, 1),
)
last_progress_log_at = now
def _register_plan(plan: _ActWorkPlan) -> None:
progress_eli_id = next(
(
plan.eli_id_by_variant.get(variant_id)
for variant_id in plan.variants_to_process
if plan.eli_id_by_variant.get(variant_id) is not None
),
None,
)
pending_by_version: Counter[str] = Counter()
for variant_id in plan.variants_to_process:
variant = plan.variants_by_id.get(variant_id)
if variant is None:
raise RuntimeError(f"Unknown variant id {variant_id} for {plan.manifest_id}")
pending_by_version[variant.version] += 1
aggregations[plan.manifest_id] = _ActAggregation(
plan=plan,
expected_variants=len(plan.variants_to_process),
progress_eli_id=progress_eli_id,
pending_variants_by_version=dict(pending_by_version),
ordered_versions=tuple(sorted(pending_by_version, key=version_tag_sort_key)),
)
if progress_eli_id is None:
skip_stats["missing_eli_id_progress_start"] += 1
return
ingest_manifest_store.commit_parse_side_effects(
source=config.origin_source,
owner_alias_kind=config.owner_alias_kind,
run_id=run_id,
progress_checkpoints=[
IngestProgressCheckpointInput(
owner_alias_value=progress_eli_id,
phase="parsing",
last_variant_id=None,
variants_total=len(plan.variants_to_process),
variants_done=0,
status="running",
)
],
)
def _drain_ready_events(aggregation: _ActAggregation) -> None:
while aggregation.next_version_index < len(aggregation.ordered_versions):
version_tag = aggregation.ordered_versions[aggregation.next_version_index]
if aggregation.pending_variants_by_version.get(version_tag, 0) > 0:
return
for event in aggregation.buffered_events_by_version.pop(version_tag, ()):
aggregation.builder.apply(event)
aggregation.next_version_index += 1
def _collect_variant_side_effect_rows(
*,
plan: _ActWorkPlan,
aggregation: _ActAggregation,
) -> tuple[list[IngestVariantEventCheckpointInput], list[IngestVersionStateCheckpointInput]]:
variant_checkpoints = [
IngestVariantEventCheckpointInput(
owner_alias_value=owner_alias_value,
variant_id=str(event_payload["variant_id"]),
version_tag=str(event_payload["version_tag"]),
parse_status=str(event_payload["parse_status"]),
variant_fingerprint=str(event_payload["variant_fingerprint"]),
skip_reason=(
str(event_payload["skip_reason"]) if event_payload.get("skip_reason") is not None else None
),
metrics=(dict(event_payload["metrics"]) if event_payload.get("metrics") is not None else None),
)
for owner_alias_value, event_payload in aggregation.variant_event_rows
]
if variant_checkpoints:
skip_stats["variant_events_written"] += len(variant_checkpoints)
version_checkpoints: list[IngestVersionStateCheckpointInput] = []
for version_tag, variant_id, _variant_metadata in aggregation.version_entries:
sources = plan.source_hashes_by_variant.get(variant_id, {})
eli_id = plan.eli_id_by_variant.get(variant_id)
if eli_id is None:
skip_stats["missing_eli_id_version_state_skips"] += 1
continue
version_checkpoints.append(
IngestVersionStateCheckpointInput(
owner_alias_value=eli_id,
version_tag=version_tag,
source_hashes=sources,
)
)
return variant_checkpoints, version_checkpoints
def _finalize_aggregation(aggregation: _ActAggregation) -> None:
nonlocal \
written, \
acts_processed, \
manifest_entries_completed, \
versions_total, \
skipped_versions, \
missing_intervals_total
plan = aggregation.plan
acts_processed += 1
manifest_entries_completed += 1
skipped_versions += aggregation.skipped_versions
versions_total += len(aggregation.version_entries)
_drain_ready_events(aggregation)
if aggregation.next_version_index != len(aggregation.ordered_versions):
pending_versions = [
version_tag for version_tag, count in aggregation.pending_variants_by_version.items() if count > 0
]
raise RuntimeError(
f"Aggregation finalized with pending versions for {plan.manifest_id}: {pending_versions}"
)
variant_checkpoints, version_checkpoints = _collect_variant_side_effect_rows(
plan=plan,
aggregation=aggregation,
)
if not aggregation.builder.state.input_versions:
skip_stats["no_parsed_variant_events"] += 1
skip_stats["acts_completed_no_build"] += 1
progress_eli_id = aggregation.progress_eli_id
if progress_eli_id is not None:
ingest_manifest_store.commit_parse_side_effects(
source=config.origin_source,
owner_alias_kind=config.owner_alias_kind,
run_id=run_id,
variant_event_checkpoints=variant_checkpoints,
version_state_checkpoints=version_checkpoints,
progress_checkpoints=[
IngestProgressCheckpointInput(
owner_alias_value=progress_eli_id,
phase="parsed_no_build",
last_variant_id=(
aggregation.version_entries[-1][1] if aggregation.version_entries else None
),
variants_total=aggregation.expected_variants,
variants_done=aggregation.completed_variants,
status="completed",
)
],
)
else:
ingest_manifest_store.commit_parse_side_effects(
source=config.origin_source,
owner_alias_kind=config.owner_alias_kind,
run_id=run_id,
variant_event_checkpoints=variant_checkpoints,
version_state_checkpoints=version_checkpoints,
)
skip_stats["missing_eli_id_progress_no_build"] += 1
return
bundle = aggregation.builder.finalize()
build_version_tag = max(bundle.input_versions, key=version_tag_sort_key)
build_eli_id = plan.eli_id_by_version.get(build_version_tag)
if build_eli_id is None:
ingest_manifest_store.commit_parse_side_effects(
source=config.origin_source,
owner_alias_kind=config.owner_alias_kind,
run_id=run_id,
variant_event_checkpoints=variant_checkpoints,
version_state_checkpoints=version_checkpoints,
)
skip_stats["missing_build_eli_id"] += 1
return
build_variants = plan.variants_by_version.get(build_version_tag, ())
previous_hashes = existing_versions_by_eli_id.get(build_eli_id, {}).get(build_version_tag, {})
if "json" not in previous_hashes and any("json" in variant.source_hashes for variant in build_variants):
skip_stats["acts_upgraded_after_json_arrival"] += 1
interval_node_keys = {(str(row["node_type"]), str(row["node_id"])) for row in bundle.node_interval_rows}
variant_diagnostic: dict[str, Any] = {
"has_conflict": len(build_variants) > 1,
"variant_count": max(1, len(build_variants)),
"provenance": [
{
"variant_id": variant.variant_id,
"variant_sha": variant.variant_sha,
"variant_path": variant.variant_path,
"variant_format": variant.variant_format,
"source_hashes": variant.source_hashes,
"eli_id": variant.eli_id,
}
for variant in build_variants
],
}
ingest_manifest_store.persist_finalized_build(
source=config.origin_source,
owner_alias_kind=config.owner_alias_kind,
run_id=run_id,
payload=IngestFinalizedBuildPayloadInput(
owner_alias_value=build_eli_id,
urn_nir=bundle.urn_nir,
version_tag=build_version_tag,
input_versions=list(bundle.input_versions),
interval_node_count=len(interval_node_keys),
content_blob_count=len(bundle.content_blob_rows),
original_payloads=(aggregation.raw_payloads_by_version if config.include_raw_payloads else None),
variant_diagnostic=variant_diagnostic,
node_intervals=[
{
"node_type": str(row["node_type"]),
"node_id": str(row["node_id"]),
"interval_id": str(row["interval_id"]),
"start_date": (str(row["start_date"]) if row["start_date"] is not None else None),
"end_date": (str(row["end_date"]) if row["end_date"] is not None else None),
"content_version_id": (
str(row["content_version_id"]) if row["content_version_id"] is not None else None
),
"missing_reason": (str(row["missing_reason"]) if row["missing_reason"] is not None else None),
"observations": list(row["observations"]),
"legal_updates": list(row["legal_updates"]),
}
for row in bundle.node_interval_rows
],
content_blobs=[
{
"content_version_id": str(row["content_version_id"]),
"node_type": str(row["node_type"]),
"payload": dict(row["payload"]),
}
for row in bundle.content_blob_rows
],
metadata_deltas=[
{
"urn_nir": str(row["urn_nir"]),
"effective_date": str(row["effective_date"]),
"set_values": (dict(row["set_values"]) if row["set_values"] is not None else None),
"unset_values": (list(row["unset_values"]) if row["unset_values"] is not None else None),
}
for row in bundle.metadata_delta_rows
],
),
)
missing_rows = [
{
"owner_alias_value": build_eli_id,
"urn_nir": str(row["urn_nir"]),
"node_type": str(row["node_type"]),
"node_id": str(row["node_id"]),
"start_date": (str(row["start_date"]) if row["start_date"] is not None else None),
"end_date": (str(row["end_date"]) if row["end_date"] is not None else None),
"reason": (str(row["reason"]) if row["reason"] is not None else None),
}
for row in bundle.missing_interval_rows
]
missing_intervals_total += len(missing_rows)
if config.write_missing_intervals:
hint_updates[
SourceOwnerAliasKey(
source=config.origin_source,
owner_alias_kind=config.owner_alias_kind,
owner_alias_value=build_eli_id,
)
] = missing_rows
ingest_manifest_store.commit_parse_side_effects(
source=config.origin_source,
owner_alias_kind=config.owner_alias_kind,
run_id=run_id,
variant_event_checkpoints=variant_checkpoints,
version_state_checkpoints=version_checkpoints,
progress_checkpoints=[
IngestProgressCheckpointInput(
owner_alias_value=build_eli_id,
phase="finalized",
last_variant_id=(aggregation.version_entries[-1][1] if aggregation.version_entries else None),
variants_total=aggregation.expected_variants,
variants_done=aggregation.completed_variants,
status="completed",
)
],
)
written += 1
combo_counts[frozenset(aggregation.sources_seen)] += 1
_maybe_log_progress()
def _maybe_finalize(aggregation: _ActAggregation) -> None:
if aggregation.completed_variants < aggregation.expected_variants:
return
aggregations.pop(aggregation.plan.manifest_id, None)
_finalize_aggregation(aggregation)
def _accumulate_result(plan: _ActWorkPlan, result: _ActWorkerResult) -> None:
aggregation = aggregations.get(plan.manifest_id)
if aggregation is None:
raise RuntimeError(f"Missing aggregation for {plan.manifest_id}")
aggregation.completed_variants += 1
aggregation.duration_seconds += result.duration_seconds
aggregation.worker_rss_kb_max = max(aggregation.worker_rss_kb_max, result.worker_rss_kb)
observed_worker_rss_kb_by_manifest[plan.manifest_id] = max(
observed_worker_rss_kb_by_manifest.get(plan.manifest_id, 0),
int(result.worker_rss_kb),
)
aggregation.skipped_versions += result.skipped_versions
aggregation.sources_seen.update(result.sources_seen)
aggregation.version_entries.extend(result.version_entries_all)
variant_eli_id = plan.eli_id_by_variant.get(result.variant_id)
if variant_eli_id is not None:
aggregation.variant_event_rows.append(
(
variant_eli_id,
{
"variant_id": result.variant_id,
"version_tag": result.version_tag,
"parse_status": result.parse_status,
"variant_fingerprint": result.variant_sha,
"skip_reason": result.skip_reason,
"metrics": result.metrics,
},
)
)
if result.parse_status == "skipped" and result.skip_reason == "missing_required_source_fields":
skip_stats["skipped_missing_required_source_fields"] += 1
else:
skip_stats["missing_eli_id_variant_events"] += 1
if result.raw_payloads_by_version:
aggregation.raw_payloads_by_version.update(result.raw_payloads_by_version)
pending_count = aggregation.pending_variants_by_version.get(result.version_tag)
if pending_count is None:
raise RuntimeError(f"Unexpected version tag {result.version_tag} for {plan.manifest_id}")
if pending_count <= 0:
raise RuntimeError(f"Duplicate result received for version {result.version_tag} in {plan.manifest_id}")
aggregation.pending_variants_by_version[result.version_tag] = pending_count - 1
for parsed_event in result.parsed_events:
if parsed_event.version_tag != result.version_tag:
raise RuntimeError(
f"Parsed version {parsed_event.version_tag} differs from task version "
f"{result.version_tag} for {plan.manifest_id}"
)
aggregation.buffered_events_by_version.setdefault(result.version_tag, []).append(parsed_event)
_drain_ready_events(aggregation)
# Variant events are persisted once at finalization time to avoid
# per-variant DB round-trips during the hot accumulation loop.
_maybe_finalize(aggregation)
def _plan_iterator() -> Iterable[_ActWorkPlan]:
nonlocal items_with_mismatch, skipped_versions, acts_expected_total, manifest_entries_completed
for item_id, payload in inventory_store.iter_artifact_items(limit=config.limit):
prepared = _prepare_act_plan(
item_id=item_id,
payload=payload,
formats=("json", "akn", "nir"),
snapshot_adapter=config.snapshot_adapter,
existing_versions_by_eli_id=existing_versions_by_eli_id,
force_reparse=config.force_reparse,
log_limiter=log_limiter,
)
if prepared is None:
manifest_entries_completed += 1
continue
plan, mismatch = prepared
if mismatch:
items_with_mismatch += 1
skipped_versions += plan.pre_skipped_variants
if not plan.variants_to_process:
if plan.pre_skipped_variants > 0:
skip_stats["all_variants_unchanged_acts"] += 1
manifest_entries_completed += 1
continue
acts_expected_total += 1
yield plan
try:
plans = _plan_iterator()
if act_worker_count == 1:
for plan in plans:
_register_plan(plan)
for variant_id in plan.variants_to_process:
variant = plan.variants_by_id[variant_id]
result = _run_act_worker(
_ActWorkerTask(
input_dir=config.input_dir,
manifest_id=plan.manifest_id,
version=variant.version,
variant_id=variant.variant_id,
variant_sha=variant.variant_sha,
variant_path=variant.variant_path,
variant_format=variant.variant_format,
json_path=variant.json_path,
akn_path=variant.akn_path,
nir_path=variant.nir_path,
variant_count_for_version=variant.variant_count_for_version,
include_raw_payloads=config.include_raw_payloads,
worker_log_level=worker_log_level,
snapshot_adapter=config.snapshot_adapter,
)
)
_accumulate_result(plan, result)
else:
ctx = multiprocessing.get_context("spawn")
result_queue: Queue[tuple[str, _ActWorkPlan, str | None, BaseException | None]] = Queue()
pending_count = 0
pending_by_manifest: Counter[str] = Counter()
estimated_inflight_kb_by_variant: dict[str, int] = {}
def _enqueue_success(plan: _ActWorkPlan):
def _callback(result: _ActWorkerResult) -> None:
try:
_accumulate_result(plan, result)
except BaseException as exc: # pragma: no cover
result_queue.put(("error", plan, result.variant_id, exc))
return
result_queue.put(("ok", plan, result.variant_id, None))
return _callback
def _enqueue_error(plan: _ActWorkPlan, *, variant_id: str):
def _callback(exc: BaseException) -> None:
result_queue.put(("error", plan, variant_id, exc))
return _callback
def _drain_one(*, timeout_seconds: float | None = None) -> bool:
nonlocal pending_count, worker_pool_rss_live_kb, estimated_inflight_worker_kb
if pending_count == 0:
return False
try:
if timeout_seconds is None:
kind, plan, completed_variant_id, payload = result_queue.get()
else:
kind, plan, completed_variant_id, payload = result_queue.get(timeout=timeout_seconds)
except Empty:
# Emit log-only progress while the parent waits on large
# parse tasks without reintroducing hot-loop DB writes.
worker_pool_rss_live_kb = _worker_pool_rss_kb(pool)
estimated_inflight_worker_kb = sum(estimated_inflight_kb_by_variant.values())
_maybe_log_progress(force=True)
return False
pending_count -= 1
pending_by_manifest[plan.manifest_id] = max(0, pending_by_manifest[plan.manifest_id] - 1)
if completed_variant_id is not None:
estimated_inflight_kb_by_variant.pop(completed_variant_id, None)
worker_pool_rss_live_kb = _worker_pool_rss_kb(pool)
estimated_inflight_worker_kb = sum(estimated_inflight_kb_by_variant.values())
if kind == "error":
raise RuntimeError(f"Act worker failed for {plan.manifest_id}: {payload}") from payload
return True
with ctx.Pool(processes=act_worker_count, maxtasksperchild=pool_max_tasks) as pool:
for plan in plans:
_register_plan(plan)
local_limit = _per_act_dispatch_limit(
plan=plan,
default_limit=min(batch_size, per_act_inflight_limit),
memory_budget_kb=memory_budget_kb,
)
if local_limit < min(batch_size, per_act_inflight_limit):
log.info(
"Reduced per-act inflight limit for heavy act",
manifest_id=plan.manifest_id,
local_limit=local_limit,
max_estimated_variant_memory_mb=round(plan.max_estimated_variant_memory_kb / 1024, 1),
total_input_mb=round(plan.total_input_bytes / 1024 / 1024, 1),
)
for variant_id in plan.variants_to_process:
estimated_variant_kb = max(
plan.estimated_memory_kb_by_variant.get(variant_id, 0),
observed_worker_rss_kb_by_manifest.get(plan.manifest_id, 0),
)
while True:
worker_pool_rss_live_kb = _worker_pool_rss_kb(pool)
estimated_inflight_worker_kb = sum(estimated_inflight_kb_by_variant.values())
scheduler_committed_kb = _scheduler_committed_memory_kb(
worker_pool_rss_kb=worker_pool_rss_live_kb,
estimated_inflight_kb=estimated_inflight_worker_kb,
)
over_batch_limit = pending_count >= batch_size
over_act_limit = pending_by_manifest[plan.manifest_id] >= local_limit
over_memory_budget = (
pending_count > 0 and (scheduler_committed_kb + estimated_variant_kb) > memory_budget_kb
)
if not (over_batch_limit or over_act_limit or over_memory_budget):
break
_drain_one(timeout_seconds=progress_log_every_seconds)
variant = plan.variants_by_id[variant_id]
pool.apply_async(
_run_act_worker,
(
_ActWorkerTask(
input_dir=config.input_dir,
manifest_id=plan.manifest_id,
version=variant.version,
variant_id=variant.variant_id,
variant_sha=variant.variant_sha,
variant_path=variant.variant_path,
variant_format=variant.variant_format,
json_path=variant.json_path,
akn_path=variant.akn_path,
nir_path=variant.nir_path,
variant_count_for_version=variant.variant_count_for_version,
include_raw_payloads=config.include_raw_payloads,
worker_log_level=worker_log_level,
snapshot_adapter=config.snapshot_adapter,
),
),
callback=_enqueue_success(plan),
error_callback=_enqueue_error(plan, variant_id=variant_id),
)
pending_count += 1
pending_by_manifest[plan.manifest_id] += 1
estimated_inflight_kb_by_variant[variant_id] = estimated_variant_kb
estimated_inflight_worker_kb = sum(estimated_inflight_kb_by_variant.values())
while pending_count:
_drain_one(timeout_seconds=progress_log_every_seconds)
if config.write_missing_intervals:
ingest_digest = MissingSnapshotDigest(
path=ingest_manifest_path.resolve(strict=False).as_posix(),
sha256=compute_file_sha256(ingest_manifest_path),
)
if acts_processed == 0:
if hints_snapshot_preexisting:
merged_hints = {key: list(rows) for key, rows in existing_hint_records.items()}
else:
raise RuntimeError("No existing hints; run ingest with --force-reparse.")
else:
merged_hints = (
{} if config.force_reparse else {key: list(rows) for key, rows in existing_hint_records.items()}
)
for owner_key, rows in hint_updates.items():
if rows:
merged_hints[owner_key] = list(rows)
else:
merged_hints.pop(owner_key, None)
processed_all = config.limit is None and acts_expected_total > 0 and acts_processed >= acts_expected_total
previous_metadata_complete = bool(existing_hint_metadata and existing_hint_metadata.complete)
previous_manifest_match = bool(
existing_hint_metadata
and digests_match(
download_digest,
build_manifest_digest(Path(existing_hint_metadata.download_manifest.path), None),
)
)
incremental_ready = bool(
config.limit is None
and (
config.force_reparse
or (
hints_snapshot_preexisting
and previous_metadata_complete
and (acts_processed > 0 or previous_manifest_match)
)
)
)
complete = config.limit is None and (processed_all or incremental_ready)
if complete:
acts_expected = acts_expected_total
elif config.limit is not None:
acts_expected = min(config.limit, acts_expected_total or total_manifest_entries)
else:
acts_expected = acts_expected_total or total_manifest_entries
metadata = MissingSnapshotMetadata(
download_manifest=MissingSnapshotDigest(
path=download_digest.path,
sha256=download_digest.sha256,
updated_at=download_digest.updated_at,
),
ingest_manifest=ingest_digest,
acts_total=(acts_expected if complete else acts_processed),
acts_with_missing=len(merged_hints),
missing_intervals_total=sum(len(rows) for rows in merged_hints.values()),
acts_expected=acts_expected,
complete=complete,
schema_version="1.0",
generated_at=datetime.now(tz=UTC).isoformat(),
)
ingest_manifest_store.save_missing_snapshot(
run_id=run_id,
source=config.origin_source,
owner_alias_kind=config.owner_alias_kind,
metadata=metadata,
hints_by_source_owner_alias=dict(
sorted(
merged_hints.items(),
key=lambda item: (
item[0].source,
item[0].owner_alias_kind,
item[0].owner_alias_value,
),
)
),
)
except Exception:
lifecycle_state.run_status = "failed"
raise
finally:
try:
log_limiter.flush(summary_message="Hot-loop logs suppressed")
if profile_dir is not None and parent_profiler is not None:
parent_profiler.disable()
alloc_snapshot = tracemalloc.take_snapshot() if tracemalloc.is_tracing() else None
if parent_started_tracemalloc and tracemalloc.is_tracing():
tracemalloc.stop()
_write_profile_report(
report_path=profile_dir / f"parent_{os.getpid()}_run_{run_id}.json",
profiler=parent_profiler,
alloc_snapshot=alloc_snapshot,
rss_start_kb=parent_rss_start_kb,
rss_end_kb=_current_process_rss_kb(),
extra={
"run_id": run_id,
"status": lifecycle_state.run_status,
"acts_processed": acts_processed,
"versions_total": versions_total,
"written": written,
"items_with_mismatch": items_with_mismatch,
"missing_intervals_total": missing_intervals_total,
},
)
finally:
lifecycle_cm.__exit__(*sys.exc_info())
elapsed = perf_counter() - start_time
_maybe_log_progress(force=True)
log.info(
"Ingest throughput",
elapsed_seconds=round(elapsed, 2),
acts_per_second=round((written / elapsed) if elapsed > 0 else 0.0, 2),
versions_per_second=round((versions_total / elapsed) if elapsed > 0 else 0.0, 2),
)
log.info(
"Ingest skip summary",
skipped_variants_recorded=int(skip_stats.get("skipped_missing_required_source_fields", 0)),
no_build_acts=int(skip_stats.get("acts_completed_no_build", 0)),
variant_events_written=int(skip_stats.get("variant_events_written", 0)),
acts_upgraded_after_json_arrival=int(skip_stats.get("acts_upgraded_after_json_arrival", 0)),
)
return IngestResult(
written=written,
versions_total=versions_total,
skipped_versions=skipped_versions,
combo_counts=combo_counts,
items_with_mismatch=items_with_mismatch,
missing_intervals_total=missing_intervals_total,
)