Rust Ingestion Service¶
Runtime Responsibility¶
moraine-ingest is the sole writer that transforms JSONL lines from configured Codex and Claude trace sources into canonical event rows. It owns file-change detection, append-safe consumption, schema-aware normalization, and batched persistence with checkpoint advancement. Although retrieval runs elsewhere, this service defines the invariants retrieval depends on: stable event identity and source provenance. main.rs:L7ingestor.rs:L45normalize.rs:L336moraine.toml:L21-33
The process is started with a multi-thread Tokio runtime and a TOML config. At startup it pings ClickHouse, loads checkpoint state from ingest_checkpoints, instantiates channels and concurrency guards, and then starts watcher, debounce, reconcile, worker, sink, and heartbeat loops. The startup sequence intentionally front-loads dependency failure: if ClickHouse is unreachable, ingestion does not enter a partially alive state. main.rs:L28ingestor.rs:L47ingestor.rs:L49ingestor.rs:L145
Execution Model¶
The runtime is a hybrid of event-driven and reconciliation-driven scheduling. A watcher thread forwards file events to debounce logic, while reconcile periodically scans the full sessions glob. This dual path exists because watcher streams can drop events; reconcile guarantees eventual reinspection. ingestor.rs:L187ingestor.rs:L230ingestor.rs:L284ingestor.rs:L752
Dispatch state uses pending, inflight, and dirty. Updates that arrive while a path is inflight are marked dirty, then replayed immediately after inflight completion. This preserves edits that race with long reads without queue blow-up. ingestor.rs:L23ingestor.rs:L124ingestor.rs:L135ingestor.rs:L536
Concurrency is bounded by a semaphore (max_file_workers) and channel capacities (max_inflight_batches, derived process queue capacity). Worker tasks are spawned per file with owned semaphore permits, and sink writes are serialized by a single sink task. This architecture avoids lock-heavy shared mutable write paths while still allowing high parallelism in read/parse/normalize stages. ingestor.rs:L60ingestor.rs:L62ingestor.rs:L74ingestor.rs:L106
Checkpoint and File Identity Semantics¶
Checkpoint state is persisted in ClickHouse and cached in memory. Identity is (source_file, source_inode, source_generation, last_offset, last_line_no) plus status. source_generation is the anti-corruption key: if inode changes or file size drops below offset, generation increments and offsets reset, making rotation/truncation safe. model.rs:L5ingestor.rs:L584ingestor.rs:L586001_schema.sql:L95
A file is skipped when current size equals checkpoint offset and generation is unchanged. This optimization removes unnecessary scans on quiet files. Conversely, any generation change causes processing even if no new bytes are present, because downstream consumers need the checkpoint transition persisted to avoid stale inode assumptions later. ingestor.rs:L593ingestor.rs:L711
Checkpoint updates are merged in-memory before flush. Replacement policy prefers higher generation, then higher offset within generation. This ensures that out-of-order chunk completion cannot regress progress. Because updates are eventually persisted through ReplacingMergeTree(updated_at), retries are safe and idempotent at the logical checkpoint level. ingestor.rs:L427ingestor.rs:L434001_schema.sql:L104
Normalization Contract¶
Normalization starts with a raw ledger row storing original JSON, top-level type, hash, inferred session ID, and source coordinates. This row is emitted for every parseable object and remains the forensic replay surface. normalize.rs:L372001_schema.sql:L3
Canonical event rows are produced by branching on top-level type and payload subtype. Modern envelopes and legacy top-level records are mapped into shared fields (event_class, payload_type, actor_role, call_id, name, text_content), preserving compatibility across historical formats. normalize.rs:L469normalize.rs:L614normalize.rs:L664
Event identity is deterministic and source-based. event_uid is SHA-256 over source coordinates, generation, payload fingerprint, and suffix. That means replay of unchanged input yields stable IDs, while semantically distinct expansions (for example compacted-history children) receive unique suffix-scoped IDs. Deterministic identity is foundational for replacement-table semantics and trace joins. normalize.rs:L100normalize.rs:L108normalize.rs:L675
compacted records are represented as one parent compacted_raw row plus expanded children from replacement_history, each linked by compacted_parent_uid. This preserves both compaction boundaries and chronological detail. normalize.rs:L674normalize.rs:L694normalize.rs:L716002_views.sql:L6
Text extraction is recursively schema-aware but bounded. Message extraction walks nested arrays/objects and collects string fields from known keys, truncating very large strings to cap memory. Function arguments and tool output are similarly bounded at high character limits. This keeps indexing useful for verbose payloads without allowing unbounded per-row memory amplification. normalize.rs:L53normalize.rs:L89normalize.rs:L498normalize.rs:L526
Token accounting is preserved rather than normalized into fixed numeric columns. For event_msg payloads with token_count type, the service stores a compact JSON blob in token_usage_json. This keeps ingestion schema-forward compatible with evolving token metadata while allowing downstream extraction in SQL when needed. normalize.rs:L624001_schema.sql:L43
Sink, Flush, and Durability Semantics¶
The sink task is the ingestion durability boundary. Workers send RowBatch messages; sink aggregates rows and flushes on batch threshold or timer. Heartbeats are emitted from the same loop to expose queue depth, flush latency, and counters. ingestor.rs:L311ingestor.rs:L345ingestor.rs:L362ingestor.rs:L390
Flush order is fixed and intentional: raw_events first, events second, event_links third, tool_io fourth, ingest_errors fifth, and ingest_checkpoints last. The ordering prioritizes data availability before progress advancement. If flush fails before checkpoint insert, retried processing may duplicate rows but will not skip unseen data; this is exactly the intended at-least-once safety property. ingestor.rs:L597ingestor.rs:L598ingestor.rs:L599ingestor.rs:L600ingestor.rs:L601ingestor.rs:L603
On successful flush, counters and checkpoint cache advance. On failure, buffers remain resident and flush_failures increments; the service does not crash immediately. This favors continuity under transient DB instability but requires operators to monitor heartbeat error fields and queue depth. ingestor.rs:L492ingestor.rs:L500ingestor.rs:L513003_ingest_heartbeats.sql:L14
Backpressure and Scheduling Under Load¶
Backpressure emerges naturally from bounded channels and semaphore limits. If workers produce faster than sink can flush, sink channel saturation slows worker sends. If enqueue rate exceeds worker drain rate, queue_depth rises and can be observed in heartbeat rows. This architecture avoids runaway memory on the happy path while providing clear observability when the system nears capacity. ingestor.rs:L62ingestor.rs:L74ingestor.rs:L393ingestor.rs:L545
Debounce interval tuning materially changes behavior under churn. Very low debounce values reduce trigger latency but increase duplicate scheduling pressure; higher values coalesce better but delay visibility. The default of 50 ms is a compromise oriented toward fast local append workloads. Reconcile interval similarly trades CPU scan cost for missed-event recovery delay; default is 30 s. moraine.toml:L18-19ingestor.rs:L238
Batch size and flush interval jointly define visibility cadence and insert efficiency. Larger batches and longer flush intervals reduce insert overhead and improve throughput; smaller values improve freshness at higher request rates and write amplification. The sink’s timer-based flush means low-traffic files still become visible promptly without waiting for high batch thresholds. moraine.toml:L12-13ingestor.rs:L326ingestor.rs:L362
Failure Modes and Recovery Behavior¶
The highest-risk correctness failure mode is stale progress across file lifecycle changes. Generation rollover resets offsets on inode change or shrink; removing this logic can silently skip rotated data. ingestor.rs:L584ingestor.rs:L587
Watcher unreliability is handled by design: reconcile rescans for eventual rediscovery, and checkpoints guarantee resumed offsets. If reconcile is disabled, correctness is no longer defensible under missed events. ingestor.rs:L284ingestor.rs:L296ingestor.rs:L572
Malformed records are logged to ingest_errors and skipped. This prevents pipeline stalls but can hide upstream regressions if error rates are not observed. Operational policy should treat increasing ingest_errors as a schema drift signal requiring normalization updates, not as harmless noise. ingestor.rs:L648001_schema.sql:L80
During prolonged ClickHouse outages, in-memory buffers can grow because failed flushes keep data resident. The system does not currently implement disk-backed queue spill or adaptive throttling. For heavy ingestion workloads, this is the primary known limitation and should inform incident response: restore DB availability quickly or temporarily reduce input rate. ingestor.rs:L512clickhouse.rs:L62
Tuning and Extension Guidance¶
When tuning throughput, change one parameter class at a time and observe heartbeats and lag indicators. Typical order is: increase max_file_workers, then batch_size, then adjust flush_interval_seconds, then revisit max_inflight_batches. Simultaneous broad changes make causal attribution impossible under bursty append traffic. moraine.toml:L11-12moraine.toml:L15-16003_ingest_heartbeats.sql:L5
When adding new event payload shapes, extend normalize_record rather than branching downstream SQL to interpret raw payload blobs. Canonical field population is the contract boundary; if it weakens, retrieval and analytics logic accumulate format-specific conditionals and eventually diverge. Every new branch should preserve source coordinates, deterministic UID behavior, and payload JSON preservation semantics. normalize.rs:L157normalize.rs:L177normalize.rs:L336
When changing extraction or token-usage capture, verify impacts in both canonical tables and search projections. Search materialized views depend on non-empty text_content; extraction regressions directly reduce recall and can appear as retrieval quality failures rather than ingestion incidents. This coupling is subtle and should be tested explicitly after normalization edits. 004_search_index.sql:L53004_search_index.sql:L80normalize.rs:L651
The implementation is intentionally explicit. Preserve that property when extending the service.