crates/moraine-ingest-core/src/normalize.rs
1 use crate::model::NormalizedRecord;
2 use anyhow::{anyhow, Result};
3 use chrono::{DateTime, Utc};
4 use regex::Regex;
5 use serde_json::{json, Map, Value};
6 use sha2::{Digest, Sha256};
7 use std::sync::OnceLock;
8 use std::time::{SystemTime, UNIX_EPOCH};
9
10 const TEXT_LIMIT: usize = 200_000;
11 const PREVIEW_LIMIT: usize = 320;
12 const UNPARSEABLE_EVENT_TS: &str = "1970-01-01 00:00:00.000";
13
14 fn session_id_re() -> &'static Regex {
15 static SESSION_ID_RE: OnceLock<Regex> = OnceLock::new();
16 SESSION_ID_RE.get_or_init(|| {
17 Regex::new(
18 r"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})$",
19 )
20 .expect("valid session id regex")
21 })
22 }
23
24 fn session_date_re() -> &'static Regex {
25 static SESSION_DATE_RE: OnceLock<Regex> = OnceLock::new();
26 SESSION_DATE_RE.get_or_init(|| {
27 Regex::new(r"/(?:sessions|projects)/(\d{4})/(\d{2})/(\d{2})/")
28 .expect("valid session date regex")
29 })
30 }
31
32 fn to_str(value: Option<&Value>) -> String {
33 match value {
34 None | Some(Value::Null) => String::new(),
35 Some(Value::String(s)) => s.clone(),
36 Some(other) => other.to_string(),
37 }
38 }
39
40 fn to_u32(value: Option<&Value>) -> u32 {
41 match value {
42 Some(Value::Number(n)) => n.as_u64().unwrap_or(0).min(u32::MAX as u64) as u32,
43 Some(Value::String(s)) => s.parse::<u64>().unwrap_or(0).min(u32::MAX as u64) as u32,
44 _ => 0,
45 }
46 }
47
48 fn to_u16(value: Option<&Value>) -> u16 {
49 to_u32(value).min(u16::MAX as u32) as u16
50 }
51
52 fn to_u8_bool(value: Option<&Value>) -> u8 {
53 match value {
54 Some(Value::Bool(v)) => {
55 if *v {
56 1
57 } else {
58 0
59 }
60 }
61 Some(Value::Number(v)) => {
62 if v.as_i64().unwrap_or(0) != 0 {
63 1
64 } else {
65 0
66 }
67 }
68 Some(Value::String(s)) => {
69 let lower = s.to_ascii_lowercase();
70 if lower == "true" || lower == "1" {
71 1
72 } else {
73 0
74 }
75 }
76 _ => 0,
77 }
78 }
79
80 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
81 enum Provider {
82 Codex,
83 Claude,
84 }
85
86 impl Provider {
87 fn parse(raw: &str) -> Result<Self> {
88 match raw.trim().to_ascii_lowercase().as_str() {
89 "codex" => Ok(Self::Codex),
90 "claude" => Ok(Self::Claude),
91 _ => Err(anyhow!(
92 "unsupported provider `{}`; expected one of: codex, claude",
93 raw.trim()
94 )),
95 }
96 }
97
98 fn as_str(self) -> &'static str {
99 match self {
100 Self::Codex => "codex",
101 Self::Claude => "claude",
102 }
103 }
104 }
105
106 fn canonicalize_model(provider: &str, raw_model: &str) -> String {
107 let mut model = raw_model.trim().to_ascii_lowercase();
108 if model.is_empty() {
109 return String::new();
110 }
111
112 model = model.replace(' ', "-");
113
114 if provider == "codex" && model == "codex" {
115 return "gpt-5.3-codex-xhigh".to_string();
116 }
117
118 model
119 }
120
121 fn resolve_model_hint(event_rows: &[Value], provider: &str, fallback: &str) -> String {
122 for row in event_rows.iter().rev() {
123 if let Some(model) = row.get("model").and_then(Value::as_str) {
124 let normalized = canonicalize_model(provider, model);
125 if !normalized.is_empty() {
126 return normalized;
127 }
128 }
129 }
130
131 canonicalize_model(provider, fallback)
132 }
133
134 fn compact_json(value: &Value) -> String {
135 serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string())
136 }
137
138 fn truncate_chars(input: &str, max_chars: usize) -> String {
139 if input.chars().count() <= max_chars {
140 input.to_string()
141 } else {
142 input.chars().take(max_chars).collect()
143 }
144 }
145
146 fn extract_message_text(content: &Value) -> String {
147 fn walk(node: &Value, out: &mut Vec<String>) {
148 match node {
149 Value::String(s) => {
150 if !s.trim().is_empty() {
151 out.push(s.clone());
152 }
153 }
154 Value::Array(items) => {
155 for item in items {
156 walk(item, out);
157 }
158 }
159 Value::Object(map) => {
160 for key in ["text", "message", "output", "thinking", "summary"] {
161 if let Some(Value::String(s)) = map.get(key) {
162 if !s.trim().is_empty() {
163 out.push(s.clone());
164 }
165 }
166 }
167
168 for key in ["content", "text_elements", "input"] {
169 if let Some(value) = map.get(key) {
170 walk(value, out);
171 }
172 }
173 }
174 _ => {}
175 }
176 }
177
178 let mut chunks = Vec::<String>::new();
179 walk(content, &mut chunks);
180 truncate_chars(&chunks.join("\n"), TEXT_LIMIT)
181 }
182
183 fn extract_content_types(content: &Value) -> Vec<String> {
184 if let Value::Array(items) = content {
185 let mut out = Vec::<String>::new();
186 for item in items {
187 if let Some(t) = item.get("type").and_then(|v| v.as_str()) {
188 if !t.is_empty() {
189 out.push(t.to_string());
190 }
191 }
192 }
193 out.sort();
194 out.dedup();
195 return out;
196 }
197 Vec::new()
198 }
199
200 pub fn infer_session_id_from_file(source_file: &str) -> String {
201 let stem = std::path::Path::new(source_file)
202 .file_stem()
203 .and_then(|s| s.to_str())
204 .unwrap_or_default();
205
206 session_id_re()
207 .captures(stem)
208 .and_then(|cap| cap.get(1).map(|m| m.as_str().to_string()))
209 .unwrap_or_default()
210 }
211
212 pub fn infer_session_date_from_file(source_file: &str, record_ts: &str) -> String {
213 if let Some(cap) = session_date_re().captures(source_file) {
214 return format!("{}-{}-{}", &cap[1], &cap[2], &cap[3]);
215 }
216
217 parse_record_ts(record_ts)
218 .map(|dt| dt.format("%Y-%m-%d").to_string())
219 .unwrap_or_else(|| "1970-01-01".to_string())
220 }
221
222 fn parse_record_ts(record_ts: &str) -> Option<DateTime<Utc>> {
223 DateTime::parse_from_rfc3339(record_ts)
224 .ok()
225 .map(|dt| dt.with_timezone(&Utc))
226 }
227
228 fn format_event_ts(dt: &DateTime<Utc>) -> String {
229 dt.format("%Y-%m-%d %H:%M:%S%.3f").to_string()
230 }
231
232 fn parse_event_ts(record_ts: &str) -> (String, bool) {
233 if let Some(dt) = parse_record_ts(record_ts) {
234 return (format_event_ts(&dt), false);
235 }
236
237 (UNPARSEABLE_EVENT_TS.to_string(), true)
238 }
239
240 fn event_kind_in_domain(value: &str) -> bool {
241 matches!(
242 value,
243 "session_meta"
244 | "turn_context"
245 | "message"
246 | "tool_call"
247 | "tool_result"
248 | "reasoning"
249 | "event_msg"
250 | "compacted_raw"
251 | "progress"
252 | "system"
253 | "summary"
254 | "queue_operation"
255 | "file_history_snapshot"
256 | "unknown"
257 )
258 }
259
260 fn payload_type_in_domain(value: &str) -> bool {
261 matches!(
262 value,
263 "session_meta"
264 | "turn_context"
265 | "message"
266 | "function_call"
267 | "function_call_output"
268 | "custom_tool_call"
269 | "custom_tool_call_output"
270 | "web_search_call"
271 | "reasoning"
272 | "response_item"
273 | "event_msg"
274 | "user_message"
275 | "agent_message"
276 | "agent_reasoning"
277 | "token_count"
278 | "task_started"
279 | "task_complete"
280 | "turn_aborted"
281 | "item_completed"
282 | "search_results_received"
283 | "compacted"
284 | "thinking"
285 | "tool_use"
286 | "tool_result"
287 | "text"
288 | "progress"
289 | "system"
290 | "summary"
291 | "queue-operation"
292 | "file-history-snapshot"
293 | "unknown"
294 )
295 }
296
297 fn link_type_in_domain(value: &str) -> bool {
298 matches!(
299 value,
300 "parent_event"
301 | "compacted_parent"
302 | "parent_uuid"
303 | "tool_use_id"
304 | "source_tool_assistant"
305 | "unknown"
306 )
307 }
308
309 fn canonicalize_event_kind(value: &str) -> &str {
310 if event_kind_in_domain(value) {
311 value
312 } else {
313 "unknown"
314 }
315 }
316
317 fn canonicalize_payload_type(value: &str) -> &str {
318 if payload_type_in_domain(value) {
319 value
320 } else {
321 "unknown"
322 }
323 }
324
325 fn canonicalize_link_type(value: &str) -> &str {
326 if link_type_in_domain(value) {
327 value
328 } else {
329 "unknown"
330 }
331 }
332
333 fn event_uid(
334 source_file: &str,
335 source_generation: u32,
336 source_line_no: u64,
337 source_offset: u64,
338 record_fingerprint: &str,
339 suffix: &str,
340 ) -> String {
341 let material = format!(
342 "{}|{}|{}|{}|{}|{}",
343 source_file, source_generation, source_line_no, source_offset, record_fingerprint, suffix
344 );
345
346 let mut hasher = Sha256::new();
347 hasher.update(material.as_bytes());
348 format!("{:x}", hasher.finalize())
349 }
350
351 fn event_version() -> u64 {
352 let now = SystemTime::now()
353 .duration_since(UNIX_EPOCH)
354 .unwrap_or_default()
355 .as_millis();
356 now as u64
357 }
358
359 fn raw_hash(raw_json: &str) -> u64 {
360 let mut hasher = Sha256::new();
361 hasher.update(raw_json.as_bytes());
362 let digest = hasher.finalize();
363 let hex = format!("{:x}", digest);
364 u64::from_str_radix(&hex[..16], 16).unwrap_or(0)
365 }
366
367 fn io_hash(input_json: &str, output_json: &str) -> u64 {
368 raw_hash(&format!("{}\n{}", input_json, output_json))
369 }
370
371 struct RecordContext<'a> {
372 source_name: &'a str,
373 provider: &'a str,
374 session_id: &'a str,
375 session_date: &'a str,
376 source_file: &'a str,
377 source_inode: u64,
378 source_generation: u32,
379 source_line_no: u64,
380 source_offset: u64,
381 record_ts: &'a str,
382 event_ts: &'a str,
383 }
384
385 fn base_event_obj(
386 ctx: &RecordContext<'_>,
387 event_uid: &str,
388 event_kind: &str,
389 payload_type: &str,
390 actor_kind: &str,
391 text_content: &str,
392 payload_json: &str,
393 ) -> Map<String, Value> {
394 let text_content = truncate_chars(text_content, TEXT_LIMIT);
395 let event_kind = canonicalize_event_kind(event_kind);
396 let payload_type = canonicalize_payload_type(payload_type);
397 let mut obj = Map::<String, Value>::new();
398 obj.insert(
399 "event_uid".to_string(),
400 Value::String(event_uid.to_string()),
401 );
402 obj.insert(
403 "session_id".to_string(),
404 Value::String(ctx.session_id.to_string()),
405 );
406 obj.insert(
407 "session_date".to_string(),
408 Value::String(ctx.session_date.to_string()),
409 );
410 obj.insert(
411 "source_name".to_string(),
412 Value::String(ctx.source_name.to_string()),
413 );
414 obj.insert(
415 "provider".to_string(),
416 Value::String(ctx.provider.to_string()),
417 );
418 obj.insert(
419 "source_file".to_string(),
420 Value::String(ctx.source_file.to_string()),
421 );
422 obj.insert("source_inode".to_string(), json!(ctx.source_inode));
423 obj.insert(
424 "source_generation".to_string(),
425 json!(ctx.source_generation),
426 );
427 obj.insert("source_line_no".to_string(), json!(ctx.source_line_no));
428 obj.insert("source_offset".to_string(), json!(ctx.source_offset));
429 obj.insert(
430 "source_ref".to_string(),
431 Value::String(format!(
432 "{}:{}:{}",
433 ctx.source_file, ctx.source_generation, ctx.source_line_no
434 )),
435 );
436 obj.insert(
437 "record_ts".to_string(),
438 Value::String(ctx.record_ts.to_string()),
439 );
440 obj.insert(
441 "event_ts".to_string(),
442 Value::String(ctx.event_ts.to_string()),
443 );
444 obj.insert(
445 "event_kind".to_string(),
446 Value::String(event_kind.to_string()),
447 );
448 obj.insert(
449 "actor_kind".to_string(),
450 Value::String(actor_kind.to_string()),
451 );
452 obj.insert(
453 "payload_type".to_string(),
454 Value::String(payload_type.to_string()),
455 );
456 obj.insert("op_kind".to_string(), Value::String(String::new()));
457 obj.insert("op_status".to_string(), Value::String(String::new()));
458 obj.insert("request_id".to_string(), Value::String(String::new()));
459 obj.insert("trace_id".to_string(), Value::String(String::new()));
460 obj.insert("turn_index".to_string(), json!(0u32));
461 obj.insert("item_id".to_string(), Value::String(String::new()));
462 obj.insert("tool_call_id".to_string(), Value::String(String::new()));
463 obj.insert(
464 "parent_tool_call_id".to_string(),
465 Value::String(String::new()),
466 );
467 obj.insert("origin_event_id".to_string(), Value::String(String::new()));
468 obj.insert(
469 "origin_tool_call_id".to_string(),
470 Value::String(String::new()),
471 );
472 obj.insert("tool_name".to_string(), Value::String(String::new()));
473 obj.insert("tool_phase".to_string(), Value::String(String::new()));
474 obj.insert("tool_error".to_string(), json!(0u8));
475 obj.insert("agent_run_id".to_string(), Value::String(String::new()));
476 obj.insert("agent_label".to_string(), Value::String(String::new()));
477 obj.insert("coord_group_id".to_string(), Value::String(String::new()));
478 obj.insert(
479 "coord_group_label".to_string(),
480 Value::String(String::new()),
481 );
482 obj.insert("is_substream".to_string(), json!(0u8));
483 obj.insert("model".to_string(), Value::String(String::new()));
484 obj.insert("input_tokens".to_string(), json!(0u32));
485 obj.insert("output_tokens".to_string(), json!(0u32));
486 obj.insert("cache_read_tokens".to_string(), json!(0u32));
487 obj.insert("cache_write_tokens".to_string(), json!(0u32));
488 obj.insert("latency_ms".to_string(), json!(0u32));
489 obj.insert("retry_count".to_string(), json!(0u16));
490 obj.insert("service_tier".to_string(), Value::String(String::new()));
491 obj.insert("content_types".to_string(), json!([]));
492 obj.insert("has_reasoning".to_string(), json!(0u8));
493 obj.insert(
494 "text_content".to_string(),
495 Value::String(text_content.clone()),
496 );
497 obj.insert(
498 "text_preview".to_string(),
499 Value::String(truncate_chars(&text_content, PREVIEW_LIMIT)),
500 );
501 obj.insert(
502 "payload_json".to_string(),
503 Value::String(payload_json.to_string()),
504 );
505 obj.insert("token_usage_json".to_string(), Value::String(String::new()));
506 obj.insert("event_version".to_string(), json!(event_version()));
507 obj
508 }
509
510 fn build_link_row(
511 ctx: &RecordContext<'_>,
512 event_uid: &str,
513 linked_event_uid: &str,
514 linked_external_id: &str,
515 link_type: &str,
516 metadata_json: &str,
517 ) -> Value {
518 let link_type = canonicalize_link_type(link_type);
519 json!({
520 "event_uid": event_uid,
521 "linked_event_uid": linked_event_uid,
522 "linked_external_id": linked_external_id,
523 "link_type": link_type,
524 "session_id": ctx.session_id,
525 "provider": ctx.provider,
526 "source_name": ctx.source_name,
527 "metadata_json": metadata_json,
528 "event_version": event_version(),
529 })
530 }
531
532 fn build_event_link_row(
533 ctx: &RecordContext<'_>,
534 event_uid: &str,
535 linked_event_uid: &str,
536 link_type: &str,
537 metadata_json: &str,
538 ) -> Value {
539 build_link_row(
540 ctx,
541 event_uid,
542 linked_event_uid,
543 "",
544 link_type,
545 metadata_json,
546 )
547 }
548
549 fn build_external_link_row(
550 ctx: &RecordContext<'_>,
551 event_uid: &str,
552 linked_external_id: &str,
553 link_type: &str,
554 metadata_json: &str,
555 ) -> Value {
556 build_link_row(
557 ctx,
558 event_uid,
559 "",
560 linked_external_id,
561 link_type,
562 metadata_json,
563 )
564 }
565
566 fn build_tool_row(
567 ctx: &RecordContext<'_>,
568 event_uid: &str,
569 tool_call_id: &str,
570 parent_tool_call_id: &str,
571 tool_name: &str,
572 tool_phase: &str,
573 tool_error: u8,
574 input_json: &str,
575 output_json: &str,
576 output_text: &str,
577 ) -> Value {
578 let input_json = truncate_chars(input_json, TEXT_LIMIT);
579 let output_json = truncate_chars(output_json, TEXT_LIMIT);
580 let output_text = truncate_chars(output_text, TEXT_LIMIT);
581
582 json!({
583 "event_uid": event_uid,
584 "session_id": ctx.session_id,
585 "provider": ctx.provider,
586 "source_name": ctx.source_name,
587 "tool_call_id": tool_call_id,
588 "parent_tool_call_id": parent_tool_call_id,
589 "tool_name": tool_name,
590 "tool_phase": tool_phase,
591 "tool_error": tool_error,
592 "input_json": input_json,
593 "output_json": output_json,
594 "output_text": output_text,
595 "input_bytes": input_json.len() as u32,
596 "output_bytes": output_json.len() as u32,
597 "input_preview": truncate_chars(&input_json, PREVIEW_LIMIT),
598 "output_preview": truncate_chars(&output_text, PREVIEW_LIMIT),
599 "io_hash": io_hash(&input_json, &output_json),
600 "source_ref": format!("{}:{}:{}", ctx.source_file, ctx.source_generation, ctx.source_line_no),
601 "event_version": event_version(),
602 })
603 }
604
605 fn normalize_codex_event(
606 record: &Value,
607 ctx: &RecordContext<'_>,
608 top_type: &str,
609 base_uid: &str,
610 model_hint: &str,
611 ) -> (Vec<Value>, Vec<Value>, Vec<Value>) {
612 let mut events = Vec::<Value>::new();
613 let mut links = Vec::<Value>::new();
614 let mut tools = Vec::<Value>::new();
615
616 let payload = record.get("payload").cloned().unwrap_or(Value::Null);
617 let payload_obj = payload.as_object().cloned().unwrap_or_else(Map::new);
618 let payload_json = compact_json(&Value::Object(payload_obj.clone()));
619
620 let push_parent_link = |links: &mut Vec<Value>, uid: &str, parent: &str| {
621 if !parent.is_empty() {
622 links.push(build_external_link_row(
623 ctx,
624 uid,
625 parent,
626 "parent_event",
627 "{}",
628 ));
629 }
630 };
631
632 match top_type {
633 "session_meta" => {
634 let mut row = base_event_obj(
635 ctx,
636 base_uid,
637 "session_meta",
638 "session_meta",
639 "system",
640 "",
641 &payload_json,
642 );
643 row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
644 events.push(Value::Object(row));
645 }
646 "turn_context" => {
647 let mut row = base_event_obj(
648 ctx,
649 base_uid,
650 "turn_context",
651 "turn_context",
652 "system",
653 "",
654 &payload_json,
655 );
656 row.insert(
657 "turn_index".to_string(),
658 json!(to_u32(payload_obj.get("turn_id"))),
659 );
660 let turn_id = to_str(payload_obj.get("turn_id"));
661 if !turn_id.is_empty() {
662 row.insert("request_id".to_string(), json!(turn_id.clone()));
663 row.insert("item_id".to_string(), json!(turn_id));
664 }
665 let model = canonicalize_model("codex", &to_str(payload_obj.get("model")));
666 if !model.is_empty() {
667 row.insert("model".to_string(), json!(model));
668 }
669 events.push(Value::Object(row));
670 }
671 "response_item" => {
672 let payload_type = to_str(payload_obj.get("type"));
673 match payload_type.as_str() {
674 "message" => {
675 let role = to_str(payload_obj.get("role"));
676 let content = payload_obj.get("content").cloned().unwrap_or(Value::Null);
677 let text = extract_message_text(&content);
678 let mut row = base_event_obj(
679 ctx,
680 base_uid,
681 "message",
682 "message",
683 if role.is_empty() {
684 "assistant"
685 } else {
686 role.as_str()
687 },
688 &text,
689 &payload_json,
690 );
691 row.insert(
692 "content_types".to_string(),
693 json!(extract_content_types(&content)),
694 );
695 row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
696 row.insert(
697 "op_status".to_string(),
698 json!(to_str(payload_obj.get("phase"))),
699 );
700 events.push(Value::Object(row));
701 }
702 "function_call" => {
703 let args = to_str(payload_obj.get("arguments"));
704 let call_id = to_str(payload_obj.get("call_id"));
705 let name = to_str(payload_obj.get("name"));
706 let mut row = base_event_obj(
707 ctx,
708 base_uid,
709 "tool_call",
710 "function_call",
711 "assistant",
712 &args,
713 &payload_json,
714 );
715 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
716 row.insert("tool_name".to_string(), json!(name.clone()));
717 events.push(Value::Object(row));
718
719 tools.push(build_tool_row(
720 ctx, base_uid, &call_id, "", &name, "request", 0, &args, "", "",
721 ));
722 }
723 "function_call_output" => {
724 let output = to_str(payload_obj.get("output"));
725 let call_id = to_str(payload_obj.get("call_id"));
726 let mut row = base_event_obj(
727 ctx,
728 base_uid,
729 "tool_result",
730 "function_call_output",
731 "tool",
732 &output,
733 &payload_json,
734 );
735 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
736 events.push(Value::Object(row));
737
738 tools.push(build_tool_row(
739 ctx,
740 base_uid,
741 &call_id,
742 "",
743 "",
744 "response",
745 0,
746 "",
747 &compact_json(payload_obj.get("output").unwrap_or(&Value::Null)),
748 &output,
749 ));
750 }
751 "custom_tool_call" => {
752 let input = to_str(payload_obj.get("input"));
753 let call_id = to_str(payload_obj.get("call_id"));
754 let name = to_str(payload_obj.get("name"));
755 let status = to_str(payload_obj.get("status"));
756 let mut row = base_event_obj(
757 ctx,
758 base_uid,
759 "tool_call",
760 "custom_tool_call",
761 "assistant",
762 &input,
763 &payload_json,
764 );
765 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
766 row.insert("tool_name".to_string(), json!(name.clone()));
767 row.insert("op_status".to_string(), json!(status));
768 events.push(Value::Object(row));
769
770 tools.push(build_tool_row(
771 ctx, base_uid, &call_id, "", &name, "request", 0, &input, "", "",
772 ));
773 }
774 "custom_tool_call_output" => {
775 let output = to_str(payload_obj.get("output"));
776 let call_id = to_str(payload_obj.get("call_id"));
777 let status = to_str(payload_obj.get("status"));
778 let output_json = serde_json::from_str::<Value>(&output)
779 .map(|parsed| compact_json(&parsed))
780 .unwrap_or_else(|_| {
781 compact_json(payload_obj.get("output").unwrap_or(&Value::Null))
782 });
783
784 let mut row = base_event_obj(
785 ctx,
786 base_uid,
787 "tool_result",
788 "custom_tool_call_output",
789 "tool",
790 &output,
791 &payload_json,
792 );
793 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
794 row.insert("op_status".to_string(), json!(status));
795 events.push(Value::Object(row));
796
797 tools.push(build_tool_row(
798 ctx,
799 base_uid,
800 &call_id,
801 "",
802 "",
803 "response",
804 0,
805 "",
806 &output_json,
807 &output,
808 ));
809 }
810 "web_search_call" => {
811 let action = payload_obj.get("action").cloned().unwrap_or(Value::Null);
812 let action_type = to_str(action.get("type"));
813 let status = to_str(payload_obj.get("status"));
814 let mut row = base_event_obj(
815 ctx,
816 base_uid,
817 "tool_call",
818 "web_search_call",
819 "assistant",
820 &extract_message_text(&action),
821 &payload_json,
822 );
823 row.insert("tool_name".to_string(), json!("web_search"));
824 row.insert("op_kind".to_string(), json!(action_type));
825 row.insert("op_status".to_string(), json!(status.clone()));
826 row.insert("tool_phase".to_string(), json!(status));
827 events.push(Value::Object(row));
828 }
829 "reasoning" => {
830 let summary = payload_obj.get("summary").cloned().unwrap_or(Value::Null);
831 let mut row = base_event_obj(
832 ctx,
833 base_uid,
834 "reasoning",
835 "reasoning",
836 "assistant",
837 &extract_message_text(&summary),
838 &payload_json,
839 );
840 row.insert("has_reasoning".to_string(), json!(1u8));
841 row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
842 events.push(Value::Object(row));
843 }
844 _ => {
845 events.push(Value::Object(base_event_obj(
846 ctx,
847 base_uid,
848 "unknown",
849 if payload_type.is_empty() {
850 "response_item"
851 } else {
852 payload_type.as_str()
853 },
854 "system",
855 &extract_message_text(&payload),
856 &payload_json,
857 )));
858 }
859 }
860 }
861 "event_msg" => {
862 let payload_type = to_str(payload_obj.get("type"));
863 let actor = match payload_type.as_str() {
864 "user_message" => "user",
865 "agent_message" | "agent_reasoning" => "assistant",
866 _ => "system",
867 };
868 let mut row = base_event_obj(
869 ctx,
870 base_uid,
871 "event_msg",
872 if payload_type.is_empty() {
873 "event_msg"
874 } else {
875 payload_type.as_str()
876 },
877 actor,
878 &extract_message_text(&payload),
879 &payload_json,
880 );
881 let turn_id = to_str(payload_obj.get("turn_id"));
882 if !turn_id.is_empty() {
883 row.insert("request_id".to_string(), json!(turn_id.clone()));
884 row.insert("item_id".to_string(), json!(turn_id));
885 }
886 let status = to_str(payload_obj.get("status"));
887 if !status.is_empty() {
888 row.insert("op_status".to_string(), json!(status));
889 }
890 if payload_type == "token_count" {
891 let usage = payload_obj
892 .get("info")
893 .and_then(|v| v.get("last_token_usage"));
894 let input_tokens = to_u32(usage.and_then(|v| v.get("input_tokens")));
895 let output_tokens = to_u32(usage.and_then(|v| v.get("output_tokens")));
896 let cache_read_tokens = to_u32(
897 usage
898 .and_then(|v| v.get("cached_input_tokens"))
899 .or_else(|| usage.and_then(|v| v.get("cache_read_input_tokens"))),
900 );
901 let cache_write_tokens = to_u32(
902 usage
903 .and_then(|v| v.get("cache_creation_input_tokens"))
904 .or_else(|| usage.and_then(|v| v.get("cache_write_input_tokens"))),
905 );
906
907 let model = to_str(
908 payload_obj
909 .get("rate_limits")
910 .and_then(|v| v.get("limit_name")),
911 );
912 let fallback_model = to_str(payload_obj.get("model"));
913 let fallback_limit_id = to_str(
914 payload_obj
915 .get("rate_limits")
916 .and_then(|v| v.get("limit_id")),
917 );
918 let resolved_model = if !model.is_empty() {
919 canonicalize_model("codex", &model)
920 } else if !fallback_model.is_empty() {
921 canonicalize_model("codex", &fallback_model)
922 } else if !fallback_limit_id.is_empty() {
923 canonicalize_model("codex", &fallback_limit_id)
924 } else {
925 canonicalize_model("codex", model_hint)
926 };
927
928 row.insert("input_tokens".to_string(), json!(input_tokens));
929 row.insert("output_tokens".to_string(), json!(output_tokens));
930 row.insert("cache_read_tokens".to_string(), json!(cache_read_tokens));
931 row.insert("cache_write_tokens".to_string(), json!(cache_write_tokens));
932 if !resolved_model.is_empty() {
933 row.insert("model".to_string(), json!(resolved_model));
934 }
935 row.insert(
936 "service_tier".to_string(),
937 json!(to_str(
938 payload_obj
939 .get("rate_limits")
940 .and_then(|v| v.get("plan_type"))
941 )),
942 );
943 row.insert(
944 "token_usage_json".to_string(),
945 json!(compact_json(&payload)),
946 );
947 } else if payload_type == "agent_reasoning" {
948 row.insert("has_reasoning".to_string(), json!(1u8));
949 }
950 events.push(Value::Object(row));
951 }
952 "compacted" => {
953 events.push(Value::Object(base_event_obj(
954 ctx,
955 base_uid,
956 "compacted_raw",
957 "compacted",
958 "system",
959 "",
960 &payload_json,
961 )));
962
963 if let Some(Value::Array(items)) = payload_obj.get("replacement_history") {
964 for (idx, item) in items.iter().enumerate() {
965 let item_uid = event_uid(
966 ctx.source_file,
967 ctx.source_generation,
968 ctx.source_line_no,
969 ctx.source_offset,
970 &compact_json(item),
971 &format!("compacted:{}", idx),
972 );
973 let item_type = to_str(item.get("type"));
974
975 let (kind, payload_type, actor, text) = match item_type.as_str() {
976 "message" => (
977 "message",
978 "message",
979 to_str(item.get("role")),
980 extract_message_text(item.get("content").unwrap_or(&Value::Null)),
981 ),
982 "function_call" => (
983 "tool_call",
984 "function_call",
985 "assistant".to_string(),
986 to_str(item.get("arguments")),
987 ),
988 "function_call_output" => (
989 "tool_result",
990 "function_call_output",
991 "tool".to_string(),
992 to_str(item.get("output")),
993 ),
994 "reasoning" => (
995 "reasoning",
996 "reasoning",
997 "assistant".to_string(),
998 extract_message_text(item.get("summary").unwrap_or(&Value::Null)),
999 ),
1000 _ => (
1001 "unknown",
1002 if item_type.is_empty() {
1003 "unknown"
1004 } else {
1005 item_type.as_str()
1006 },
1007 "system".to_string(),
1008 extract_message_text(item),
1009 ),
1010 };
1011
1012 let mut row = base_event_obj(
1013 ctx,
1014 &item_uid,
1015 kind,
1016 payload_type,
1017 if actor.is_empty() {
1018 "assistant"
1019 } else {
1020 actor.as_str()
1021 },
1022 &text,
1023 &compact_json(item),
1024 );
1025 row.insert("origin_event_id".to_string(), json!(base_uid));
1026 events.push(Value::Object(row));
1027
1028 links.push(build_event_link_row(
1029 ctx,
1030 &item_uid,
1031 base_uid,
1032 "compacted_parent",
1033 "{}",
1034 ));
1035 }
1036 }
1037 }
1038 "message" | "function_call" | "function_call_output" | "reasoning" => {
1039 let event = if top_type == "message" {
1040 let role = to_str(record.get("role"));
1041 let text = extract_message_text(record.get("content").unwrap_or(&Value::Null));
1042 let mut row = base_event_obj(
1043 ctx,
1044 base_uid,
1045 "message",
1046 "message",
1047 if role.is_empty() {
1048 "assistant"
1049 } else {
1050 role.as_str()
1051 },
1052 &text,
1053 &compact_json(record),
1054 );
1055 row.insert(
1056 "content_types".to_string(),
1057 json!(extract_content_types(
1058 record.get("content").unwrap_or(&Value::Null)
1059 )),
1060 );
1061 Value::Object(row)
1062 } else if top_type == "function_call" {
1063 let args = to_str(record.get("arguments"));
1064 let call_id = to_str(record.get("call_id"));
1065 let name = to_str(record.get("name"));
1066 let mut row = base_event_obj(
1067 ctx,
1068 base_uid,
1069 "tool_call",
1070 "function_call",
1071 "assistant",
1072 &args,
1073 &compact_json(record),
1074 );
1075 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
1076 row.insert("tool_name".to_string(), json!(name.clone()));
1077 tools.push(build_tool_row(
1078 ctx, base_uid, &call_id, "", &name, "request", 0, &args, "", "",
1079 ));
1080 Value::Object(row)
1081 } else if top_type == "function_call_output" {
1082 let output = to_str(record.get("output"));
1083 let call_id = to_str(record.get("call_id"));
1084 let mut row = base_event_obj(
1085 ctx,
1086 base_uid,
1087 "tool_result",
1088 "function_call_output",
1089 "tool",
1090 &output,
1091 &compact_json(record),
1092 );
1093 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
1094 tools.push(build_tool_row(
1095 ctx,
1096 base_uid,
1097 &call_id,
1098 "",
1099 "",
1100 "response",
1101 0,
1102 "",
1103 &compact_json(record.get("output").unwrap_or(&Value::Null)),
1104 &output,
1105 ));
1106 Value::Object(row)
1107 } else {
1108 let summary = record.get("summary").cloned().unwrap_or(Value::Null);
1109 let mut row = base_event_obj(
1110 ctx,
1111 base_uid,
1112 "reasoning",
1113 "reasoning",
1114 "assistant",
1115 &extract_message_text(&summary),
1116 &compact_json(record),
1117 );
1118 row.insert("has_reasoning".to_string(), json!(1u8));
1119 Value::Object(row)
1120 };
1121
1122 events.push(event);
1123 }
1124 _ => {
1125 events.push(Value::Object(base_event_obj(
1126 ctx,
1127 base_uid,
1128 "unknown",
1129 if top_type.is_empty() {
1130 "unknown"
1131 } else {
1132 top_type
1133 },
1134 "system",
1135 &extract_message_text(record),
1136 &compact_json(record),
1137 )));
1138 }
1139 }
1140
1141 let payload_model = canonicalize_model("codex", &to_str(payload_obj.get("model")));
1142 let inherited_model = canonicalize_model("codex", model_hint);
1143 for event in &mut events {
1144 if let Some(row) = event.as_object_mut() {
1145 let row_model = canonicalize_model("codex", &to_str(row.get("model")));
1146 let resolved_model = if !row_model.is_empty() {
1147 row_model
1148 } else if !payload_model.is_empty() {
1149 payload_model.clone()
1150 } else {
1151 inherited_model.clone()
1152 };
1153
1154 if !resolved_model.is_empty() {
1155 row.insert("model".to_string(), json!(resolved_model));
1156 }
1157 }
1158 }
1159
1160 let parent = to_str(record.get("parent_id"));
1161 if !events.is_empty() && !parent.is_empty() {
1162 if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
1163 push_parent_link(&mut links, uid, &parent);
1164 }
1165 }
1166
1167 (events, links, tools)
1168 }
1169
1170 fn normalize_claude_event(
1171 record: &Value,
1172 ctx: &RecordContext<'_>,
1173 top_type: &str,
1174 base_uid: &str,
1175 ) -> (Vec<Value>, Vec<Value>, Vec<Value>) {
1176 let mut events = Vec::<Value>::new();
1177 let mut links = Vec::<Value>::new();
1178 let mut tools = Vec::<Value>::new();
1179
1180 let parent_uuid = to_str(record.get("parentUuid"));
1181 let request_id = to_str(record.get("requestId"));
1182 let trace_id = to_str(record.get("requestId"));
1183 let agent_run_id = to_str(record.get("agentId"));
1184 let agent_label = to_str(record.get("agentName"));
1185 let coord_group_label = to_str(record.get("teamName"));
1186 let is_substream = to_u8_bool(record.get("isSidechain"));
1187
1188 let message = record.get("message").cloned().unwrap_or(Value::Null);
1189 let msg_role = to_str(message.get("role"));
1190 let model = canonicalize_model("claude", &to_str(message.get("model")));
1191
1192 let usage = message.get("usage").cloned().unwrap_or(Value::Null);
1193 let input_tokens = to_u32(usage.get("input_tokens"));
1194 let output_tokens = to_u32(usage.get("output_tokens"));
1195 let cache_read_tokens = to_u32(usage.get("cache_read_input_tokens"));
1196 let cache_write_tokens = to_u32(usage.get("cache_creation_input_tokens"));
1197 let service_tier = to_str(usage.get("service_tier"));
1198
1199 let stamp_common = |obj: &mut Map<String, Value>| {
1200 obj.insert("request_id".to_string(), json!(request_id.clone()));
1201 obj.insert("trace_id".to_string(), json!(trace_id.clone()));
1202 obj.insert("agent_run_id".to_string(), json!(agent_run_id.clone()));
1203 obj.insert("agent_label".to_string(), json!(agent_label.clone()));
1204 obj.insert(
1205 "coord_group_label".to_string(),
1206 json!(coord_group_label.clone()),
1207 );
1208 obj.insert("is_substream".to_string(), json!(is_substream));
1209 obj.insert("model".to_string(), json!(model.clone()));
1210 obj.insert("input_tokens".to_string(), json!(input_tokens));
1211 obj.insert("output_tokens".to_string(), json!(output_tokens));
1212 obj.insert("cache_read_tokens".to_string(), json!(cache_read_tokens));
1213 obj.insert("cache_write_tokens".to_string(), json!(cache_write_tokens));
1214 obj.insert("service_tier".to_string(), json!(service_tier.clone()));
1215 obj.insert("item_id".to_string(), json!(to_str(record.get("uuid"))));
1216 obj.insert(
1217 "origin_event_id".to_string(),
1218 json!(to_str(record.get("sourceToolAssistantUUID"))),
1219 );
1220 obj.insert(
1221 "origin_tool_call_id".to_string(),
1222 json!(to_str(record.get("sourceToolUseID"))),
1223 );
1224 };
1225
1226 if top_type == "assistant" || top_type == "user" {
1227 let actor = if top_type == "assistant" {
1228 "assistant"
1229 } else if msg_role == "assistant" {
1230 "assistant"
1231 } else {
1232 "user"
1233 };
1234
1235 let content = message.get("content").cloned().unwrap_or_else(|| {
1236 record
1237 .get("message")
1238 .and_then(|m| m.get("content"))
1239 .cloned()
1240 .unwrap_or(Value::Null)
1241 });
1242
1243 match content {
1244 Value::Array(items) if !items.is_empty() => {
1245 for (idx, item) in items.iter().enumerate() {
1246 let block_type = to_str(item.get("type"));
1247 let suffix = format!("claude:block:{}", idx);
1248 let block_uid = event_uid(
1249 ctx.source_file,
1250 ctx.source_generation,
1251 ctx.source_line_no,
1252 ctx.source_offset,
1253 &compact_json(item),
1254 &suffix,
1255 );
1256
1257 let mut row = match block_type.as_str() {
1258 "thinking" => {
1259 let mut r = base_event_obj(
1260 ctx,
1261 &block_uid,
1262 "reasoning",
1263 "thinking",
1264 "assistant",
1265 &extract_message_text(item),
1266 &compact_json(item),
1267 );
1268 r.insert("has_reasoning".to_string(), json!(1u8));
1269 r.insert("content_types".to_string(), json!(["thinking"]));
1270 r
1271 }
1272 "tool_use" => {
1273 let tool_call_id = to_str(item.get("id"));
1274 let tool_name = to_str(item.get("name"));
1275 let input_json =
1276 compact_json(item.get("input").unwrap_or(&Value::Null));
1277 let mut r = base_event_obj(
1278 ctx,
1279 &block_uid,
1280 "tool_call",
1281 "tool_use",
1282 "assistant",
1283 &extract_message_text(item.get("input").unwrap_or(&Value::Null)),
1284 &compact_json(item),
1285 );
1286 r.insert("content_types".to_string(), json!(["tool_use"]));
1287 r.insert("tool_call_id".to_string(), json!(tool_call_id.clone()));
1288 r.insert("tool_name".to_string(), json!(tool_name.clone()));
1289 tools.push(build_tool_row(
1290 ctx,
1291 &block_uid,
1292 &tool_call_id,
1293 &to_str(record.get("parentToolUseID")),
1294 &tool_name,
1295 "request",
1296 0,
1297 &input_json,
1298 "",
1299 "",
1300 ));
1301 r
1302 }
1303 "tool_result" => {
1304 let tool_call_id = to_str(item.get("tool_use_id"));
1305 let output_json =
1306 compact_json(item.get("content").unwrap_or(&Value::Null));
1307 let output_text =
1308 extract_message_text(item.get("content").unwrap_or(&Value::Null));
1309 let tool_error = to_u8_bool(item.get("is_error"));
1310 let mut r = base_event_obj(
1311 ctx,
1312 &block_uid,
1313 "tool_result",
1314 "tool_result",
1315 "tool",
1316 &output_text,
1317 &compact_json(item),
1318 );
1319 r.insert("content_types".to_string(), json!(["tool_result"]));
1320 r.insert("tool_call_id".to_string(), json!(tool_call_id.clone()));
1321 r.insert("tool_error".to_string(), json!(tool_error));
1322 tools.push(build_tool_row(
1323 ctx,
1324 &block_uid,
1325 &tool_call_id,
1326 &to_str(record.get("parentToolUseID")),
1327 "",
1328 "response",
1329 tool_error,
1330 "",
1331 &output_json,
1332 &output_text,
1333 ));
1334 r
1335 }
1336 _ => {
1337 let mut r = base_event_obj(
1338 ctx,
1339 &block_uid,
1340 "message",
1341 if block_type.is_empty() {
1342 "text"
1343 } else {
1344 block_type.as_str()
1345 },
1346 actor,
1347 &extract_message_text(item),
1348 &compact_json(item),
1349 );
1350 if !block_type.is_empty() {
1351 r.insert("content_types".to_string(), json!([block_type]));
1352 }
1353 r
1354 }
1355 };
1356
1357 stamp_common(&mut row);
1358 row.insert(
1359 "parent_tool_call_id".to_string(),
1360 json!(to_str(record.get("parentToolUseID"))),
1361 );
1362 row.insert(
1363 "origin_tool_call_id".to_string(),
1364 json!(to_str(record.get("sourceToolUseID"))),
1365 );
1366 row.insert(
1367 "tool_phase".to_string(),
1368 json!(to_str(record.get("stop_reason"))),
1369 );
1370 events.push(Value::Object(row));
1371
1372 if !parent_uuid.is_empty() {
1373 links.push(build_external_link_row(
1374 ctx,
1375 &block_uid,
1376 &parent_uuid,
1377 "parent_uuid",
1378 "{}",
1379 ));
1380 }
1381 }
1382 }
1383 _ => {
1384 let text = extract_message_text(&message);
1385 let mut row = base_event_obj(
1386 ctx,
1387 base_uid,
1388 "message",
1389 "message",
1390 actor,
1391 &text,
1392 &compact_json(record),
1393 );
1394 row.insert(
1395 "content_types".to_string(),
1396 json!(extract_content_types(
1397 message.get("content").unwrap_or(&Value::Null)
1398 )),
1399 );
1400 stamp_common(&mut row);
1401 events.push(Value::Object(row));
1402 if !parent_uuid.is_empty() {
1403 links.push(build_external_link_row(
1404 ctx,
1405 base_uid,
1406 &parent_uuid,
1407 "parent_uuid",
1408 "{}",
1409 ));
1410 }
1411 }
1412 }
1413 } else {
1414 let event_kind = match top_type {
1415 "progress" => "progress",
1416 "system" => "system",
1417 "summary" => "summary",
1418 "queue-operation" => "queue_operation",
1419 "file-history-snapshot" => "file_history_snapshot",
1420 _ => "unknown",
1421 };
1422
1423 let payload_type = if top_type == "progress" {
1424 to_str(record.get("data").and_then(|d| d.get("type")))
1425 } else if top_type == "system" {
1426 to_str(record.get("subtype"))
1427 } else {
1428 top_type.to_string()
1429 };
1430
1431 let mut row = base_event_obj(
1432 ctx,
1433 base_uid,
1434 event_kind,
1435 if payload_type.is_empty() {
1436 top_type
1437 } else {
1438 payload_type.as_str()
1439 },
1440 "system",
1441 &extract_message_text(record),
1442 &compact_json(record),
1443 );
1444 row.insert("op_kind".to_string(), json!(payload_type));
1445 row.insert("op_status".to_string(), json!(to_str(record.get("status"))));
1446 row.insert(
1447 "latency_ms".to_string(),
1448 json!(to_u32(record.get("durationMs"))),
1449 );
1450 row.insert(
1451 "retry_count".to_string(),
1452 json!(to_u16(record.get("retryAttempt"))),
1453 );
1454 stamp_common(&mut row);
1455 events.push(Value::Object(row));
1456
1457 if !parent_uuid.is_empty() {
1458 links.push(build_external_link_row(
1459 ctx,
1460 base_uid,
1461 &parent_uuid,
1462 "parent_uuid",
1463 "{}",
1464 ));
1465 }
1466 }
1467
1468 if !events.is_empty() {
1469 let tool_use_id = to_str(record.get("toolUseID"));
1470 if !tool_use_id.is_empty() {
1471 if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
1472 links.push(build_external_link_row(
1473 ctx,
1474 uid,
1475 &tool_use_id,
1476 "tool_use_id",
1477 "{}",
1478 ));
1479 }
1480 }
1481
1482 let source_tool_assistant = to_str(record.get("sourceToolAssistantUUID"));
1483 if !source_tool_assistant.is_empty() {
1484 if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
1485 links.push(build_external_link_row(
1486 ctx,
1487 uid,
1488 &source_tool_assistant,
1489 "source_tool_assistant",
1490 "{}",
1491 ));
1492 }
1493 }
1494 }
1495
1496 (events, links, tools)
1497 }
1498
1499 pub fn normalize_record(
1500 record: &Value,
1501 source_name: &str,
1502 provider: &str,
1503 source_file: &str,
1504 source_inode: u64,
1505 source_generation: u32,
1506 source_line_no: u64,
1507 source_offset: u64,
1508 session_hint: &str,
1509 model_hint: &str,
1510 ) -> Result<NormalizedRecord> {
1511 let provider = Provider::parse(provider)?;
1512 let provider_name = provider.as_str();
1513 let record_ts = to_str(record.get("timestamp"));
1514 let (event_ts, event_ts_parse_failed) = parse_event_ts(&record_ts);
1515 let top_type = to_str(record.get("type"));
1516
1517 let mut session_id = if provider == Provider::Claude {
1518 to_str(record.get("sessionId"))
1519 } else {
1520 String::new()
1521 };
1522 if session_id.is_empty() {
1523 session_id = if session_hint.is_empty() {
1524 infer_session_id_from_file(source_file)
1525 } else {
1526 session_hint.to_string()
1527 };
1528 }
1529
1530 if provider == Provider::Codex && top_type == "session_meta" {
1531 let payload = record.get("payload").cloned().unwrap_or(Value::Null);
1532 let payload_id = to_str(payload.get("id"));
1533 if !payload_id.is_empty() {
1534 session_id = payload_id;
1535 }
1536 }
1537
1538 let session_date = infer_session_date_from_file(source_file, &record_ts);
1539
1540 let raw_json = compact_json(record);
1541 let base_uid = event_uid(
1542 source_file,
1543 source_generation,
1544 source_line_no,
1545 source_offset,
1546 &raw_json,
1547 "raw",
1548 );
1549
1550 let raw_row = json!({
1551 "source_name": source_name,
1552 "provider": provider_name,
1553 "source_file": source_file,
1554 "source_inode": source_inode,
1555 "source_generation": source_generation,
1556 "source_line_no": source_line_no,
1557 "source_offset": source_offset,
1558 "record_ts": record_ts,
1559 "top_type": top_type,
1560 "session_id": session_id,
1561 "raw_json": raw_json,
1562 "raw_json_hash": raw_hash(&raw_json),
1563 "event_uid": base_uid,
1564 });
1565
1566 let mut error_rows = Vec::<Value>::new();
1567 if event_ts_parse_failed {
1568 error_rows.push(json!({
1569 "source_name": source_name,
1570 "provider": provider_name,
1571 "source_file": source_file,
1572 "source_inode": source_inode,
1573 "source_generation": source_generation,
1574 "source_line_no": source_line_no,
1575 "source_offset": source_offset,
1576 "error_kind": "timestamp_parse_error",
1577 "error_text": format!(
1578 "timestamp is missing or not RFC3339; used {} UTC fallback",
1579 UNPARSEABLE_EVENT_TS
1580 ),
1581 "raw_fragment": truncate_chars(&record_ts, 20_000),
1582 }));
1583 }
1584
1585 let ctx = RecordContext {
1586 source_name,
1587 provider: provider_name,
1588 session_id: &session_id,
1589 session_date: &session_date,
1590 source_file,
1591 source_inode,
1592 source_generation,
1593 source_line_no,
1594 source_offset,
1595 record_ts: &record_ts,
1596 event_ts: &event_ts,
1597 };
1598
1599 let (event_rows, link_rows, tool_rows) = if provider == Provider::Claude {
1600 normalize_claude_event(record, &ctx, &top_type, &base_uid)
1601 } else {
1602 normalize_codex_event(record, &ctx, &top_type, &base_uid, model_hint)
1603 };
1604 let model_hint = resolve_model_hint(&event_rows, provider_name, model_hint);
1605
1606 Ok(NormalizedRecord {
1607 raw_row,
1608 event_rows,
1609 link_rows,
1610 tool_rows,
1611 error_rows,
1612 session_hint: session_id,
1613 model_hint,
1614 })
1615 }
1616
1617 #[cfg(test)]
1618 mod tests {
1619 use super::{build_link_row, normalize_record, RecordContext};
1620 use serde_json::json;
1621 use std::collections::HashMap;
1622
1623 #[test]
1624 fn codex_tool_call_normalization() {
1625 let record = json!({
1626 "timestamp": "2026-02-14T02:28:00.000Z",
1627 "type": "response_item",
1628 "payload": {
1629 "type": "function_call",
1630 "call_id": "call_123",
1631 "name": "Read",
1632 "arguments": "{\"path\":\"README.md\"}"
1633 }
1634 });
1635
1636 let out = normalize_record(
1637 &record,
1638 "codex",
1639 "codex",
1640 "/Users/eric/.codex/sessions/2026/02/13/session-019c59f9-6389-77a1-a0cb-304eecf935b6.jsonl",
1641 123,
1642 1,
1643 42,
1644 1024,
1645 "",
1646 "",
1647 )
1648 .expect("codex tool call should normalize");
1649
1650 assert_eq!(out.event_rows.len(), 1);
1651 assert_eq!(out.tool_rows.len(), 1);
1652 assert!(out.error_rows.is_empty());
1653 let row = out.event_rows[0].as_object().unwrap();
1654 assert_eq!(
1655 row.get("event_kind").unwrap().as_str().unwrap(),
1656 "tool_call"
1657 );
1658 assert_eq!(row.get("tool_name").unwrap().as_str().unwrap(), "Read");
1659 }
1660
1661 #[test]
1662 fn codex_turn_context_promotes_model_and_turn_id() {
1663 let record = json!({
1664 "timestamp": "2026-02-15T03:50:42.191Z",
1665 "type": "turn_context",
1666 "payload": {
1667 "turn_id": "019c5f6a-49bd-7920-ac67-1dd8e33b0e95",
1668 "model": "gpt-5.3-codex"
1669 }
1670 });
1671
1672 let out = normalize_record(
1673 &record,
1674 "codex",
1675 "codex",
1676 "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
1677 1,
1678 1,
1679 1,
1680 1,
1681 "",
1682 "",
1683 )
1684 .expect("codex turn context should normalize");
1685
1686 let row = out.event_rows[0].as_object().unwrap();
1687 assert_eq!(
1688 row.get("payload_type").unwrap().as_str().unwrap(),
1689 "turn_context"
1690 );
1691 assert_eq!(row.get("model").unwrap().as_str().unwrap(), "gpt-5.3-codex");
1692 assert_eq!(
1693 row.get("request_id").unwrap().as_str().unwrap(),
1694 "019c5f6a-49bd-7920-ac67-1dd8e33b0e95"
1695 );
1696 assert_eq!(
1697 row.get("item_id").unwrap().as_str().unwrap(),
1698 "019c5f6a-49bd-7920-ac67-1dd8e33b0e95"
1699 );
1700 }
1701
1702 #[test]
1703 fn codex_token_count_promotes_usage_fields() {
1704 let record = json!({
1705 "timestamp": "2026-02-15T03:50:50.838Z",
1706 "type": "event_msg",
1707 "payload": {
1708 "type": "token_count",
1709 "info": {
1710 "last_token_usage": {
1711 "input_tokens": 65323,
1712 "output_tokens": 445,
1713 "cached_input_tokens": 58624
1714 }
1715 },
1716 "rate_limits": {
1717 "limit_name": "GPT-5.3-Codex-Spark",
1718 "limit_id": "codex_bengalfox",
1719 "plan_type": "pro"
1720 }
1721 }
1722 });
1723
1724 let out = normalize_record(
1725 &record,
1726 "codex",
1727 "codex",
1728 "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
1729 1,
1730 1,
1731 2,
1732 2,
1733 "",
1734 "",
1735 )
1736 .expect("codex token count should normalize");
1737
1738 let row = out.event_rows[0].as_object().unwrap();
1739 assert_eq!(
1740 row.get("payload_type").unwrap().as_str().unwrap(),
1741 "token_count"
1742 );
1743 assert_eq!(row.get("input_tokens").unwrap().as_u64().unwrap(), 65323);
1744 assert_eq!(row.get("output_tokens").unwrap().as_u64().unwrap(), 445);
1745 assert_eq!(
1746 row.get("cache_read_tokens").unwrap().as_u64().unwrap(),
1747 58624
1748 );
1749 assert_eq!(
1750 row.get("model").unwrap().as_str().unwrap(),
1751 "gpt-5.3-codex-spark"
1752 );
1753 assert_eq!(row.get("service_tier").unwrap().as_str().unwrap(), "pro");
1754 assert!(!row
1755 .get("token_usage_json")
1756 .unwrap()
1757 .as_str()
1758 .unwrap()
1759 .is_empty());
1760 }
1761
1762 #[test]
1763 fn codex_token_count_alias_codex_maps_to_xhigh() {
1764 let record = json!({
1765 "timestamp": "2026-02-15T04:52:55.538Z",
1766 "type": "event_msg",
1767 "payload": {
1768 "type": "token_count",
1769 "info": {
1770 "last_token_usage": {
1771 "input_tokens": 72636,
1772 "output_tokens": 285,
1773 "cached_input_tokens": 70784
1774 }
1775 },
1776 "rate_limits": {
1777 "limit_id": "codex",
1778 "limit_name": null,
1779 "plan_type": "pro"
1780 }
1781 }
1782 });
1783
1784 let out = normalize_record(
1785 &record,
1786 "codex",
1787 "codex",
1788 "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
1789 1,
1790 1,
1791 4,
1792 4,
1793 "",
1794 "",
1795 )
1796 .expect("codex token count alias should normalize");
1797
1798 let row = out.event_rows[0].as_object().unwrap();
1799 assert_eq!(
1800 row.get("model").unwrap().as_str().unwrap(),
1801 "gpt-5.3-codex-xhigh"
1802 );
1803 }
1804
1805 #[test]
1806 fn codex_custom_tool_call_promotes_tool_fields() {
1807 let record = json!({
1808 "timestamp": "2026-02-15T03:50:50.838Z",
1809 "type": "response_item",
1810 "payload": {
1811 "type": "custom_tool_call",
1812 "call_id": "call_abc",
1813 "name": "apply_patch",
1814 "status": "completed",
1815 "input": "*** Begin Patch\n*** End Patch\n"
1816 }
1817 });
1818
1819 let out = normalize_record(
1820 &record,
1821 "codex",
1822 "codex",
1823 "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
1824 1,
1825 1,
1826 3,
1827 3,
1828 "",
1829 "",
1830 )
1831 .expect("codex custom tool call should normalize");
1832
1833 assert_eq!(out.event_rows.len(), 1);
1834 assert_eq!(out.tool_rows.len(), 1);
1835 let row = out.event_rows[0].as_object().unwrap();
1836 assert_eq!(
1837 row.get("event_kind").unwrap().as_str().unwrap(),
1838 "tool_call"
1839 );
1840 assert_eq!(
1841 row.get("tool_call_id").unwrap().as_str().unwrap(),
1842 "call_abc"
1843 );
1844 assert_eq!(
1845 row.get("tool_name").unwrap().as_str().unwrap(),
1846 "apply_patch"
1847 );
1848 assert_eq!(row.get("op_status").unwrap().as_str().unwrap(), "completed");
1849 }
1850
1851 #[test]
1852 fn claude_tool_use_and_result_blocks() {
1853 let record = json!({
1854 "type": "assistant",
1855 "sessionId": "7c666c01-d38e-4658-8650-854ffb5b626e",
1856 "uuid": "assistant-1",
1857 "parentUuid": "user-1",
1858 "requestId": "req-1",
1859 "timestamp": "2026-01-19T15:58:41.421Z",
1860 "message": {
1861 "model": "claude-opus-4-5-20251101",
1862 "role": "assistant",
1863 "usage": {
1864 "input_tokens": 9,
1865 "output_tokens": 5,
1866 "cache_creation_input_tokens": 19630,
1867 "cache_read_input_tokens": 0,
1868 "service_tier": "standard"
1869 },
1870 "content": [
1871 {
1872 "type": "tool_use",
1873 "id": "toolu_1",
1874 "name": "WebFetch",
1875 "input": {"url": "https://example.com"}
1876 },
1877 {
1878 "type": "text",
1879 "text": "done"
1880 }
1881 ]
1882 }
1883 });
1884
1885 let out = normalize_record(
1886 &record,
1887 "claude",
1888 "claude",
1889 "/Users/eric/.claude/projects/p1/s1.jsonl",
1890 55,
1891 2,
1892 10,
1893 100,
1894 "",
1895 "",
1896 )
1897 .expect("claude event should normalize");
1898
1899 assert_eq!(out.event_rows.len(), 2);
1900 assert_eq!(out.tool_rows.len(), 1);
1901
1902 let first = out.event_rows[0].as_object().unwrap();
1903 assert_eq!(
1904 first.get("event_kind").unwrap().as_str().unwrap(),
1905 "tool_call"
1906 );
1907 assert_eq!(first.get("provider").unwrap().as_str().unwrap(), "claude");
1908 assert!(out.error_rows.is_empty());
1909 }
1910
1911 #[test]
1912 fn invalid_timestamp_uses_epoch_and_emits_timestamp_parse_error() {
1913 let record = json!({
1914 "timestamp": "not-a-timestamp",
1915 "type": "response_item",
1916 "payload": {
1917 "type": "function_call",
1918 "call_id": "call_bad_ts",
1919 "name": "Read",
1920 "arguments": "{}"
1921 }
1922 });
1923
1924 let out = normalize_record(
1925 &record,
1926 "codex",
1927 "codex",
1928 "/Users/eric/.codex/sessions/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
1929 9,
1930 2,
1931 7,
1932 99,
1933 "",
1934 "",
1935 )
1936 .expect("codex event with invalid timestamp should normalize");
1937
1938 let event_row = out.event_rows[0].as_object().unwrap();
1939 assert_eq!(
1940 event_row.get("event_ts").unwrap().as_str().unwrap(),
1941 "1970-01-01 00:00:00.000"
1942 );
1943 assert_eq!(
1944 event_row.get("session_date").unwrap().as_str().unwrap(),
1945 "1970-01-01"
1946 );
1947
1948 assert_eq!(out.error_rows.len(), 1);
1949 let error = out.error_rows[0].as_object().unwrap();
1950 assert_eq!(
1951 error.get("error_kind").unwrap().as_str().unwrap(),
1952 "timestamp_parse_error"
1953 );
1954 assert_eq!(
1955 error.get("raw_fragment").unwrap().as_str().unwrap(),
1956 "not-a-timestamp"
1957 );
1958 }
1959
1960 #[test]
1961 fn invalid_timestamp_preserves_session_date_from_source_path() {
1962 let record = json!({
1963 "timestamp": "still-not-a-timestamp",
1964 "type": "response_item",
1965 "payload": {
1966 "type": "function_call",
1967 "call_id": "call_bad_ts",
1968 "name": "Read",
1969 "arguments": "{}"
1970 }
1971 });
1972
1973 let out = normalize_record(
1974 &record,
1975 "codex",
1976 "codex",
1977 "/Users/eric/.codex/sessions/2026/02/16/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
1978 11,
1979 4,
1980 12,
1981 144,
1982 "",
1983 "",
1984 )
1985 .expect("codex event should normalize while preserving session date from path");
1986
1987 let event_row = out.event_rows[0].as_object().unwrap();
1988 assert_eq!(
1989 event_row.get("event_ts").unwrap().as_str().unwrap(),
1990 "1970-01-01 00:00:00.000"
1991 );
1992 assert_eq!(
1993 event_row.get("session_date").unwrap().as_str().unwrap(),
1994 "2026-02-16"
1995 );
1996 assert_eq!(out.error_rows.len(), 1);
1997 }
1998
1999 #[test]
2000 fn unknown_provider_is_rejected() {
2001 let record = json!({
2002 "timestamp": "2026-02-15T03:50:42.191Z",
2003 "type": "turn_context",
2004 });
2005
2006 let err = normalize_record(
2007 &record,
2008 "unknown",
2009 "unknown",
2010 "/tmp/sessions/session-1.jsonl",
2011 1,
2012 1,
2013 1,
2014 1,
2015 "",
2016 "",
2017 )
2018 .expect_err("unknown provider should be rejected");
2019
2020 assert!(
2021 err.to_string().contains("unsupported provider"),
2022 "unexpected error: {err:#}"
2023 );
2024 }
2025
2026 #[test]
2027 fn claude_links_split_event_uids_from_external_ids() {
2028 let record = json!({
2029 "type": "assistant",
2030 "sessionId": "7c666c01-d38e-4658-8650-854ffb5b626e",
2031 "uuid": "assistant-2",
2032 "parentUuid": "user-parent-2",
2033 "toolUseID": "toolu_42",
2034 "sourceToolAssistantUUID": "assistant-root-1",
2035 "requestId": "req-2",
2036 "timestamp": "2026-01-19T15:59:41.421Z",
2037 "message": {
2038 "role": "assistant",
2039 "content": "done"
2040 }
2041 });
2042
2043 let out = normalize_record(
2044 &record,
2045 "claude",
2046 "claude",
2047 "/Users/eric/.claude/projects/p1/s1.jsonl",
2048 55,
2049 2,
2050 11,
2051 101,
2052 "",
2053 "",
2054 )
2055 .expect("claude assistant record should normalize");
2056
2057 assert_eq!(out.link_rows.len(), 3);
2058
2059 let by_type = out
2060 .link_rows
2061 .iter()
2062 .map(|row| {
2063 let obj = row.as_object().expect("link row object");
2064 let link_type = obj
2065 .get("link_type")
2066 .and_then(|v| v.as_str())
2067 .expect("link_type")
2068 .to_string();
2069 (link_type, obj.clone())
2070 })
2071 .collect::<HashMap<_, _>>();
2072
2073 let parent = by_type.get("parent_uuid").expect("parent_uuid link");
2074 assert_eq!(
2075 parent
2076 .get("linked_external_id")
2077 .and_then(|v| v.as_str())
2078 .unwrap(),
2079 "user-parent-2"
2080 );
2081 assert_eq!(
2082 parent
2083 .get("linked_event_uid")
2084 .and_then(|v| v.as_str())
2085 .unwrap(),
2086 ""
2087 );
2088
2089 let tool_use = by_type.get("tool_use_id").expect("tool_use_id link");
2090 assert_eq!(
2091 tool_use
2092 .get("linked_external_id")
2093 .and_then(|v| v.as_str())
2094 .unwrap(),
2095 "toolu_42"
2096 );
2097 assert_eq!(
2098 tool_use
2099 .get("linked_event_uid")
2100 .and_then(|v| v.as_str())
2101 .unwrap(),
2102 ""
2103 );
2104
2105 let source_tool = by_type
2106 .get("source_tool_assistant")
2107 .expect("source_tool_assistant link");
2108 assert_eq!(
2109 source_tool
2110 .get("linked_external_id")
2111 .and_then(|v| v.as_str())
2112 .unwrap(),
2113 "assistant-root-1"
2114 );
2115 assert_eq!(
2116 source_tool
2117 .get("linked_event_uid")
2118 .and_then(|v| v.as_str())
2119 .unwrap(),
2120 ""
2121 );
2122 }
2123
2124 #[test]
2125 fn codex_compacted_parent_link_uses_event_uid_target() {
2126 let record = json!({
2127 "timestamp": "2026-02-15T03:50:50.838Z",
2128 "type": "compacted",
2129 "payload": {
2130 "replacement_history": [
2131 {
2132 "type": "message",
2133 "role": "assistant",
2134 "content": [
2135 {"type": "text", "text": "hello"}
2136 ]
2137 }
2138 ]
2139 }
2140 });
2141
2142 let out = normalize_record(
2143 &record,
2144 "codex",
2145 "codex",
2146 "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
2147 1,
2148 1,
2149 12,
2150 12,
2151 "",
2152 "",
2153 )
2154 .expect("compacted record should normalize");
2155
2156 let compacted_uid = out.event_rows[0]
2157 .get("event_uid")
2158 .and_then(|v| v.as_str())
2159 .expect("compacted event uid");
2160 let link = out.link_rows[0].as_object().expect("compacted link");
2161
2162 assert_eq!(
2163 link.get("link_type").and_then(|v| v.as_str()).unwrap(),
2164 "compacted_parent"
2165 );
2166 assert_eq!(
2167 link.get("linked_event_uid")
2168 .and_then(|v| v.as_str())
2169 .unwrap(),
2170 compacted_uid
2171 );
2172 assert_eq!(
2173 link.get("linked_external_id")
2174 .and_then(|v| v.as_str())
2175 .unwrap(),
2176 ""
2177 );
2178 }
2179
2180 #[test]
2181 fn codex_unknown_payload_type_is_canonicalized() {
2182 let record = json!({
2183 "timestamp": "2026-02-15T03:50:50.838Z",
2184 "type": "response_item",
2185 "payload": {
2186 "type": "brand_new_payload_type",
2187 "body": "x"
2188 }
2189 });
2190
2191 let out = normalize_record(
2192 &record,
2193 "codex",
2194 "codex",
2195 "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
2196 1,
2197 1,
2198 5,
2199 5,
2200 "",
2201 "",
2202 )
2203 .expect("record should normalize");
2204
2205 let row = out.event_rows[0].as_object().unwrap();
2206 assert_eq!(row.get("event_kind").unwrap().as_str().unwrap(), "unknown");
2207 assert_eq!(
2208 row.get("payload_type").unwrap().as_str().unwrap(),
2209 "unknown"
2210 );
2211 }
2212
2213 #[test]
2214 fn codex_event_msg_known_operational_payload_type_is_preserved() {
2215 let record = json!({
2216 "timestamp": "2026-02-15T03:50:50.838Z",
2217 "type": "event_msg",
2218 "payload": {
2219 "type": "task_started",
2220 "status": "in_progress"
2221 }
2222 });
2223
2224 let out = normalize_record(
2225 &record,
2226 "codex",
2227 "codex",
2228 "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
2229 1,
2230 1,
2231 6,
2232 6,
2233 "",
2234 "",
2235 )
2236 .expect("record should normalize");
2237
2238 let row = out.event_rows[0].as_object().unwrap();
2239 assert_eq!(
2240 row.get("event_kind").unwrap().as_str().unwrap(),
2241 "event_msg"
2242 );
2243 assert_eq!(
2244 row.get("payload_type").unwrap().as_str().unwrap(),
2245 "task_started"
2246 );
2247 }
2248
2249 #[test]
2250 fn claude_progress_unknown_payload_type_moves_to_unknown_and_preserves_op_kind() {
2251 let record = json!({
2252 "timestamp": "2026-02-15T03:50:50.838Z",
2253 "type": "progress",
2254 "sessionId": "7c666c01-d38e-4658-8650-854ffb5b626e",
2255 "data": {
2256 "type": "provider_extension_step"
2257 },
2258 "status": "ok"
2259 });
2260
2261 let out = normalize_record(
2262 &record,
2263 "claude",
2264 "claude",
2265 "/Users/eric/.claude/projects/p1/s1.jsonl",
2266 1,
2267 1,
2268 6,
2269 6,
2270 "",
2271 "",
2272 )
2273 .expect("record should normalize");
2274
2275 let row = out.event_rows[0].as_object().unwrap();
2276 assert_eq!(row.get("event_kind").unwrap().as_str().unwrap(), "progress");
2277 assert_eq!(
2278 row.get("payload_type").unwrap().as_str().unwrap(),
2279 "unknown"
2280 );
2281 assert_eq!(
2282 row.get("op_kind").unwrap().as_str().unwrap(),
2283 "provider_extension_step"
2284 );
2285 }
2286
2287 #[test]
2288 fn link_type_is_canonicalized_to_domain() {
2289 let ctx = RecordContext {
2290 source_name: "codex",
2291 provider: "codex",
2292 session_id: "s1",
2293 session_date: "2026-02-15",
2294 source_file: "/tmp/s1.jsonl",
2295 source_inode: 1,
2296 source_generation: 1,
2297 source_line_no: 1,
2298 source_offset: 1,
2299 record_ts: "2026-02-15T03:50:50.838Z",
2300 event_ts: "2026-02-15 03:50:50.838",
2301 };
2302
2303 let link = build_link_row(&ctx, "e1", "e2", "", "new_link_type", "{}");
2304 let link_obj = link.as_object().unwrap();
2305 assert_eq!(
2306 link_obj.get("link_type").unwrap().as_str().unwrap(),
2307 "unknown"
2308 );
2309 }
2310 }