Ingest Sources¶
Moraine normalizes agent trace files through source adapters in
crates/moraine-ingest-core/src/sources/. Each adapter owns one harness wire
format and returns normalized event, link, and tool rows. The generic ingest
wrapper keeps cross-source behavior centralized: raw rows, timestamp parse
errors, session/model hints, source references, schema-domain checks, and sink
fan-out.
The adapter boundary is intentionally small. New source work should add parsing
and source-local handlers in sources/<name>.rs, then use shared builders for
all rows that leave the adapter.
Source Map¶
| Harness | Module | Default provider | Typical format | Notes |
|---|---|---|---|---|
codex |
sources/codex.rs |
openai |
jsonl |
OpenAI/Codex events, response items, tool calls, compaction, token counts. |
claude-code |
sources/claude_code.rs |
anthropic |
jsonl |
Claude Code message blocks, operational records, parent/tool external links. |
kimi-cli |
sources/kimi_cli.rs |
moonshot |
jsonl |
Kimi wire.jsonl; skips metadata headers and drops parent SubagentEvent rows. |
cursor |
sources/cursor.rs |
cursor |
jsonl or cursor_sqlite |
Cursor Agent transcripts under agent-transcripts/; text blocks, tool-use blocks, and local file references. Also normalizes the synthetic cursor_composer/cursor_bubble records produced by polling state.vscdb (see SQLite-Polled Sources below); composer names become session_meta events that carry the session title. |
hermes |
sources/hermes.rs |
record-derived | jsonl or session_json |
ShareGPT trajectories and live Hermes session JSON with vendor/model splitting. |
pi-coding-agent |
sources/pi.rs |
record-derived | jsonl |
Pi session JSONL trees, model/thinking metadata, assistant tool calls, tool results, and parent links. |
Adapter Contract¶
Each adapter implements IngestSource from sources/mod.rs:
| Method | Responsibility |
|---|---|
harness() |
Stable harness string written to ClickHouse and matched by config. |
default_inference_provider() |
Static provider when the source has one. Return None when provider is record-derived. |
format() |
Source default format. Most sources use Jsonl; config can still select parser behavior. |
preflight() |
Skip or rewrite source records before generic raw/error handling. Kimi skips its metadata header here. |
source_metadata() |
Per-record provider and model hint discovery. Hermes splits vendor/model here. |
record_ts() |
Timestamp string consumed by the shared parser. Kimi converts Unix-second wire timestamps here. |
top_type() |
Source-level discriminator used for dispatch inside the adapter. |
session_id() |
Stable session id derived from record fields, file path, prior session hint, or raw UID. |
normalize() |
Emit NormalizedPartials: event rows, event link rows, and tool IO rows. |
normalize() receives a RecordContext. That context has already been stamped
with source name, harness, provider, session id, source file coordinates, and
parsed timestamps. Adapters should not rebuild those fields by hand.
Shared Helpers¶
Use sources/shared.rs for common invariants:
- Scalar conversion helpers such as
to_str,to_u32,to_u8_bool, andextract_message_text. - Timestamp helpers, session id inference, and stable
event_uidgeneration. - Domain canonicalization for event kind, payload type, link type, and model names where the source allows generic canonicalization.
RecordContext,base_event_obj,build_link_row, andbuild_tool_row.TokenAccountingfor legacy token columns plustoken_usage_bucketsandtoken_usage_native_units.
Use sources/emitter.rs for row construction:
SourceEmitterowns aRecordContextand accumulatesNormalizedPartials.EventBuilderwrapsbase_event_objwith typed setters for common columns.push_tool_request,push_tool_response, and link helpers keep row shape in sync with shared constructors.
Use sources/record_view.rs when a source benefits from structured field reads
or table-driven dispatch. It gives path-aware errors and keeps handler routing
explicit without ad hoc Value indexing everywhere.
Handler Pattern¶
Keep normalize() thin. It should usually create a source-local record context,
construct a SourceEmitter, dispatch to named handlers, and return
emitter.finish().
Prefer source-local helpers for source semantics:
normalize_*_recordordispatch_*_recordfor top-level routing.handle_*oremit_*functions for source event families.- Small context structs for repeated record fields, token accounting, model state, pending tool correlation, and timestamp sequencing.
- Source-specific parsing and correlation in the source module, not in shared helpers.
Shared helpers should only grow when more than one source needs the same row contract or schema-domain rule. Source quirks such as Hermes trajectory tool backpatching, Kimi placeholder models, or Claude external parent UUID links belong in the source module.
Row Invariants¶
Adapters must emit rows that already satisfy the normalized schema domain:
- Event UIDs are deterministic for the source file coordinate, record payload, and source-local suffix. Do not reuse the raw record UID for multiple normalized events.
event_kind,payload_type, andactor_kindmust use the canonical domain.- Links must distinguish normalized event links (
linked_event_uid) from source external ids (linked_external_id). - Tool request/response rows must reference an emitted event UID and preserve
tool_call_id,tool_name,tool_phase,tool_error, input JSON, output JSON, and output text. - Token usage must populate both legacy scalar columns and
token_usage_buckets. Bucket names are schema-controlled and tested against SQL migrations. model,inference_provider, andsession_idmust preserve source-specific semantics. Hermes model strings are split from provider/model input and are not blindly canonicalized through generic helpers.- Synthetic timestamp sequencing should only advance for rows that are actually emitted. This matters for Hermes trajectory segment parsing.
SQLite-Polled Sources¶
Some harnesses keep history in a live SQLite database instead of append-only
trace files. crates/moraine-ingest-core/src/sqlite_poll.rs is the shared
polling engine for those sources; the cursor_sqlite format (Cursor
state.vscdb) is its first consumer. The engine synthesizes one record per
relevant database row and feeds it through normalize_record, so the source
adapter stays format-agnostic — sources/cursor.rs handles the synthetic
cursor_composer/cursor_bubble records the same way it handles JSONL lines.
Mechanics that differ from file-backed sources:
- Hash-based change cursor. Cursor's
cursorDiskKVtable has no rowid or timestamp watermark, so the poll cursor is a per-key content-hash map persisted in theingest_checkpoints.cursor_jsoncolumn (migrationsql/015_sqlite_checkpoint_cursor.sql). A poll emits synthetic records only for keys that are new or whose hash changed, and prunes deleted keys after each full scan. - Bounded prefix scans. Only the
composerData:andbubbleId:key prefixes are scanned, in pages, with a 10,000-key ceiling. Larger key spaces fail the poll instead of persisting an oversized cursor.agentKv:*,checkpointId:*, and the entireItemTable(which holds live auth tokens) are deliberately out of scope for v1. - Stable logical event UIDs. UID material derives from the kv key
(
cursor_sqlite:<table>:<pk>), not the mutable payload, so a row that mutates in place (streaming text, tool status changes) re-emits the same event UIDs with a newerevent_versionandReplacingMergeTreecollapses them. - Sidecar watching.
moraine_config::map_tracked_pathmapsstate.vscdb-wal/state.vscdb-shmfilesystem events back to the canonicalstate.vscdbpath so WAL-only writes trigger polls;state.vscdb.backupis untracked. Databases are opened read-only with a short busy timeout, and Moraine never checkpoints another application's WAL. - Rate-limited errors. Failures surface as
ingest_errorsrows with kindssqlite_open_error,sqlite_schema_mismatch,sqlite_cursor_too_large, andsqlite_scan_error. The cursor records the last reported kind so a persistent failure is emitted once rather than on every reconcile tick, and the data cursor is left untouched so the next poll retries.
Fixtures: fixtures/cursor/state-vscdb-kv.jsonl stores cursorDiskKV rows as
JSONL (key/value pairs). The golden contract test feeds each row through
the production synthesis path, so golden output cannot drift from the poller;
skipped rows (ghost composers, out-of-scope key families) appear as empty
snapshots so scope changes stay visible in review. Unit tests in
sqlite_poll.rs build temporary rusqlite databases to cover first-poll
emission, no-op repolls, in-place mutation re-emitting stable UIDs, payload
hygiene, and the schema-mismatch and oversized-key-space error paths; sidecar
path mapping is tested in moraine-config.
Fixtures And Contracts¶
Deterministic fixtures live under fixtures/; golden snapshots live under
crates/moraine-ingest-core/tests/goldens/source_normalization/.
crates/moraine-ingest-core/tests/golden_fixtures.rs runs the same public
normalization entry point that ingest uses and asserts row shape across all
source families.
Fixture tests should verify both adapter output and generic wrapper output:
raw_row.harness,raw_row.inference_provider,raw_row.top_type,raw_row.session_id, and raw JSON hashing.- Event row
event_kind,payload_type,actor_kind, timestamps, model, token fields, content types, and text previews. - Link row target columns and canonical link type.
- Tool row request/response phases, tool ids, previews, and error flags.
session_hintandmodel_hintcontinuity across sequential records.- Ingest error rows for invalid timestamps or malformed source records.
When behavior changes intentionally, update fixtures and goldens together. Otherwise, golden stability failures are regressions.
Config Boundary¶
moraine-config is lower-level than moraine-ingest-core; it cannot call the
ingest registry without creating a dependency cycle. Adding a harness therefore
requires updates on both sides:
crates/moraine-ingest-core/src/sources/mod.rsregistry.moraine_config::KNOWN_INGEST_HARNESSES.- Default source examples in
crates/moraine-config/src/lib.rsandconfig/moraine.tomlwhen the source should be enabled by default. - Documentation in
docs/configuration.md. - Registry/config sync tests and fixture contract tests.
The ingest-core registry test and golden fixture contract test are the guardrails that keep these lists aligned.
Validation¶
For source adapter changes, run focused tests first and expand based on blast radius:
cargo fmt --all -- --check
cargo test -p moraine-ingest-core --locked
cargo test --workspace --locked
For ingest, monitor, MCP, ClickHouse schema, or source format changes, also run the functional stack and sandbox QA described in Harness Author Workflow: