Skip to content

crates/moraine-ingest-core/src/normalize.rs


     1 use crate::model::NormalizedRecord;
     2 use anyhow::{anyhow, Result};
     3 use chrono::{DateTime, Utc};
     4 use regex::Regex;
     5 use serde_json::{json, Map, Value};
     6 use sha2::{Digest, Sha256};
     7 use std::sync::OnceLock;
     8 use std::time::{SystemTime, UNIX_EPOCH};
     9  
    10 const TEXT_LIMIT: usize = 200_000;
    11 const PREVIEW_LIMIT: usize = 320;
    12 const UNPARSEABLE_EVENT_TS: &str = "1970-01-01 00:00:00.000";
    13  
    14 fn session_id_re() -> &'static Regex {
    15     static SESSION_ID_RE: OnceLock<Regex> = OnceLock::new();
    16     SESSION_ID_RE.get_or_init(|| {
    17         Regex::new(
    18             r"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})$",
    19         )
    20         .expect("valid session id regex")
    21     })
    22 }
    23  
    24 fn session_date_re() -> &'static Regex {
    25     static SESSION_DATE_RE: OnceLock<Regex> = OnceLock::new();
    26     SESSION_DATE_RE.get_or_init(|| {
    27         Regex::new(r"/(?:sessions|projects)/(\d{4})/(\d{2})/(\d{2})/")
    28             .expect("valid session date regex")
    29     })
    30 }
    31  
    32 fn to_str(value: Option<&Value>) -> String {
    33     match value {
    34         None | Some(Value::Null) => String::new(),
    35         Some(Value::String(s)) => s.clone(),
    36         Some(other) => other.to_string(),
    37     }
    38 }
    39  
    40 fn to_u32(value: Option<&Value>) -> u32 {
    41     match value {
    42         Some(Value::Number(n)) => n.as_u64().unwrap_or(0).min(u32::MAX as u64) as u32,
    43         Some(Value::String(s)) => s.parse::<u64>().unwrap_or(0).min(u32::MAX as u64) as u32,
    44         _ => 0,
    45     }
    46 }
    47  
    48 fn to_u16(value: Option<&Value>) -> u16 {
    49     to_u32(value).min(u16::MAX as u32) as u16
    50 }
    51  
    52 fn to_u8_bool(value: Option<&Value>) -> u8 {
    53     match value {
    54         Some(Value::Bool(v)) => {
    55             if *v {
    56                 1
    57             } else {
    58                 0
    59             }
    60         }
    61         Some(Value::Number(v)) => {
    62             if v.as_i64().unwrap_or(0) != 0 {
    63                 1
    64             } else {
    65                 0
    66             }
    67         }
    68         Some(Value::String(s)) => {
    69             let lower = s.to_ascii_lowercase();
    70             if lower == "true" || lower == "1" {
    71                 1
    72             } else {
    73                 0
    74             }
    75         }
    76         _ => 0,
    77     }
    78 }
    79  
    80 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    81 enum Provider {
    82     Codex,
    83     Claude,
    84 }
    85  
    86 impl Provider {
    87     fn parse(raw: &str) -> Result<Self> {
    88         match raw.trim().to_ascii_lowercase().as_str() {
    89             "codex" => Ok(Self::Codex),
    90             "claude" => Ok(Self::Claude),
    91             _ => Err(anyhow!(
    92                 "unsupported provider `{}`; expected one of: codex, claude",
    93                 raw.trim()
    94             )),
    95         }
    96     }
    97  
    98     fn as_str(self) -> &'static str {
    99         match self {
   100             Self::Codex => "codex",
   101             Self::Claude => "claude",
   102         }
   103     }
   104 }
   105  
   106 fn canonicalize_model(provider: &str, raw_model: &str) -> String {
   107     let mut model = raw_model.trim().to_ascii_lowercase();
   108     if model.is_empty() {
   109         return String::new();
   110     }
   111  
   112     model = model.replace(' ', "-");
   113  
   114     if provider == "codex" && model == "codex" {
   115         return "gpt-5.3-codex-xhigh".to_string();
   116     }
   117  
   118     model
   119 }
   120  
   121 fn resolve_model_hint(event_rows: &[Value], provider: &str, fallback: &str) -> String {
   122     for row in event_rows.iter().rev() {
   123         if let Some(model) = row.get("model").and_then(Value::as_str) {
   124             let normalized = canonicalize_model(provider, model);
   125             if !normalized.is_empty() {
   126                 return normalized;
   127             }
   128         }
   129     }
   130  
   131     canonicalize_model(provider, fallback)
   132 }
   133  
   134 fn compact_json(value: &Value) -> String {
   135     serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string())
   136 }
   137  
   138 fn truncate_chars(input: &str, max_chars: usize) -> String {
   139     if input.chars().count() <= max_chars {
   140         input.to_string()
   141     } else {
   142         input.chars().take(max_chars).collect()
   143     }
   144 }
   145  
   146 fn extract_message_text(content: &Value) -> String {
   147     fn walk(node: &Value, out: &mut Vec<String>) {
   148         match node {
   149             Value::String(s) => {
   150                 if !s.trim().is_empty() {
   151                     out.push(s.clone());
   152                 }
   153             }
   154             Value::Array(items) => {
   155                 for item in items {
   156                     walk(item, out);
   157                 }
   158             }
   159             Value::Object(map) => {
   160                 for key in ["text", "message", "output", "thinking", "summary"] {
   161                     if let Some(Value::String(s)) = map.get(key) {
   162                         if !s.trim().is_empty() {
   163                             out.push(s.clone());
   164                         }
   165                     }
   166                 }
   167  
   168                 for key in ["content", "text_elements", "input"] {
   169                     if let Some(value) = map.get(key) {
   170                         walk(value, out);
   171                     }
   172                 }
   173             }
   174             _ => {}
   175         }
   176     }
   177  
   178     let mut chunks = Vec::<String>::new();
   179     walk(content, &mut chunks);
   180     truncate_chars(&chunks.join("\n"), TEXT_LIMIT)
   181 }
   182  
   183 fn extract_content_types(content: &Value) -> Vec<String> {
   184     if let Value::Array(items) = content {
   185         let mut out = Vec::<String>::new();
   186         for item in items {
   187             if let Some(t) = item.get("type").and_then(|v| v.as_str()) {
   188                 if !t.is_empty() {
   189                     out.push(t.to_string());
   190                 }
   191             }
   192         }
   193         out.sort();
   194         out.dedup();
   195         return out;
   196     }
   197     Vec::new()
   198 }
   199  
   200 pub fn infer_session_id_from_file(source_file: &str) -> String {
   201     let stem = std::path::Path::new(source_file)
   202         .file_stem()
   203         .and_then(|s| s.to_str())
   204         .unwrap_or_default();
   205  
   206     session_id_re()
   207         .captures(stem)
   208         .and_then(|cap| cap.get(1).map(|m| m.as_str().to_string()))
   209         .unwrap_or_default()
   210 }
   211  
   212 pub fn infer_session_date_from_file(source_file: &str, record_ts: &str) -> String {
   213     if let Some(cap) = session_date_re().captures(source_file) {
   214         return format!("{}-{}-{}", &cap[1], &cap[2], &cap[3]);
   215     }
   216  
   217     parse_record_ts(record_ts)
   218         .map(|dt| dt.format("%Y-%m-%d").to_string())
   219         .unwrap_or_else(|| "1970-01-01".to_string())
   220 }
   221  
   222 fn parse_record_ts(record_ts: &str) -> Option<DateTime<Utc>> {
   223     DateTime::parse_from_rfc3339(record_ts)
   224         .ok()
   225         .map(|dt| dt.with_timezone(&Utc))
   226 }
   227  
   228 fn format_event_ts(dt: &DateTime<Utc>) -> String {
   229     dt.format("%Y-%m-%d %H:%M:%S%.3f").to_string()
   230 }
   231  
   232 fn parse_event_ts(record_ts: &str) -> (String, bool) {
   233     if let Some(dt) = parse_record_ts(record_ts) {
   234         return (format_event_ts(&dt), false);
   235     }
   236  
   237     (UNPARSEABLE_EVENT_TS.to_string(), true)
   238 }
   239  
   240 fn event_kind_in_domain(value: &str) -> bool {
   241     matches!(
   242         value,
   243         "session_meta"
   244             | "turn_context"
   245             | "message"
   246             | "tool_call"
   247             | "tool_result"
   248             | "reasoning"
   249             | "event_msg"
   250             | "compacted_raw"
   251             | "progress"
   252             | "system"
   253             | "summary"
   254             | "queue_operation"
   255             | "file_history_snapshot"
   256             | "unknown"
   257     )
   258 }
   259  
   260 fn payload_type_in_domain(value: &str) -> bool {
   261     matches!(
   262         value,
   263         "session_meta"
   264             | "turn_context"
   265             | "message"
   266             | "function_call"
   267             | "function_call_output"
   268             | "custom_tool_call"
   269             | "custom_tool_call_output"
   270             | "web_search_call"
   271             | "reasoning"
   272             | "response_item"
   273             | "event_msg"
   274             | "user_message"
   275             | "agent_message"
   276             | "agent_reasoning"
   277             | "token_count"
   278             | "task_started"
   279             | "task_complete"
   280             | "turn_aborted"
   281             | "item_completed"
   282             | "search_results_received"
   283             | "compacted"
   284             | "thinking"
   285             | "tool_use"
   286             | "tool_result"
   287             | "text"
   288             | "progress"
   289             | "system"
   290             | "summary"
   291             | "queue-operation"
   292             | "file-history-snapshot"
   293             | "unknown"
   294     )
   295 }
   296  
   297 fn link_type_in_domain(value: &str) -> bool {
   298     matches!(
   299         value,
   300         "parent_event"
   301             | "compacted_parent"
   302             | "parent_uuid"
   303             | "tool_use_id"
   304             | "source_tool_assistant"
   305             | "unknown"
   306     )
   307 }
   308  
   309 fn canonicalize_event_kind(value: &str) -> &str {
   310     if event_kind_in_domain(value) {
   311         value
   312     } else {
   313         "unknown"
   314     }
   315 }
   316  
   317 fn canonicalize_payload_type(value: &str) -> &str {
   318     if payload_type_in_domain(value) {
   319         value
   320     } else {
   321         "unknown"
   322     }
   323 }
   324  
   325 fn canonicalize_link_type(value: &str) -> &str {
   326     if link_type_in_domain(value) {
   327         value
   328     } else {
   329         "unknown"
   330     }
   331 }
   332  
   333 fn event_uid(
   334     source_file: &str,
   335     source_generation: u32,
   336     source_line_no: u64,
   337     source_offset: u64,
   338     record_fingerprint: &str,
   339     suffix: &str,
   340 ) -> String {
   341     let material = format!(
   342         "{}|{}|{}|{}|{}|{}",
   343         source_file, source_generation, source_line_no, source_offset, record_fingerprint, suffix
   344     );
   345  
   346     let mut hasher = Sha256::new();
   347     hasher.update(material.as_bytes());
   348     format!("{:x}", hasher.finalize())
   349 }
   350  
   351 fn event_version() -> u64 {
   352     let now = SystemTime::now()
   353         .duration_since(UNIX_EPOCH)
   354         .unwrap_or_default()
   355         .as_millis();
   356     now as u64
   357 }
   358  
   359 fn raw_hash(raw_json: &str) -> u64 {
   360     let mut hasher = Sha256::new();
   361     hasher.update(raw_json.as_bytes());
   362     let digest = hasher.finalize();
   363     let hex = format!("{:x}", digest);
   364     u64::from_str_radix(&hex[..16], 16).unwrap_or(0)
   365 }
   366  
   367 fn io_hash(input_json: &str, output_json: &str) -> u64 {
   368     raw_hash(&format!("{}\n{}", input_json, output_json))
   369 }
   370  
   371 struct RecordContext<'a> {
   372     source_name: &'a str,
   373     provider: &'a str,
   374     session_id: &'a str,
   375     session_date: &'a str,
   376     source_file: &'a str,
   377     source_inode: u64,
   378     source_generation: u32,
   379     source_line_no: u64,
   380     source_offset: u64,
   381     record_ts: &'a str,
   382     event_ts: &'a str,
   383 }
   384  
   385 fn base_event_obj(
   386     ctx: &RecordContext<'_>,
   387     event_uid: &str,
   388     event_kind: &str,
   389     payload_type: &str,
   390     actor_kind: &str,
   391     text_content: &str,
   392     payload_json: &str,
   393 ) -> Map<String, Value> {
   394     let text_content = truncate_chars(text_content, TEXT_LIMIT);
   395     let event_kind = canonicalize_event_kind(event_kind);
   396     let payload_type = canonicalize_payload_type(payload_type);
   397     let mut obj = Map::<String, Value>::new();
   398     obj.insert(
   399         "event_uid".to_string(),
   400         Value::String(event_uid.to_string()),
   401     );
   402     obj.insert(
   403         "session_id".to_string(),
   404         Value::String(ctx.session_id.to_string()),
   405     );
   406     obj.insert(
   407         "session_date".to_string(),
   408         Value::String(ctx.session_date.to_string()),
   409     );
   410     obj.insert(
   411         "source_name".to_string(),
   412         Value::String(ctx.source_name.to_string()),
   413     );
   414     obj.insert(
   415         "provider".to_string(),
   416         Value::String(ctx.provider.to_string()),
   417     );
   418     obj.insert(
   419         "source_file".to_string(),
   420         Value::String(ctx.source_file.to_string()),
   421     );
   422     obj.insert("source_inode".to_string(), json!(ctx.source_inode));
   423     obj.insert(
   424         "source_generation".to_string(),
   425         json!(ctx.source_generation),
   426     );
   427     obj.insert("source_line_no".to_string(), json!(ctx.source_line_no));
   428     obj.insert("source_offset".to_string(), json!(ctx.source_offset));
   429     obj.insert(
   430         "source_ref".to_string(),
   431         Value::String(format!(
   432             "{}:{}:{}",
   433             ctx.source_file, ctx.source_generation, ctx.source_line_no
   434         )),
   435     );
   436     obj.insert(
   437         "record_ts".to_string(),
   438         Value::String(ctx.record_ts.to_string()),
   439     );
   440     obj.insert(
   441         "event_ts".to_string(),
   442         Value::String(ctx.event_ts.to_string()),
   443     );
   444     obj.insert(
   445         "event_kind".to_string(),
   446         Value::String(event_kind.to_string()),
   447     );
   448     obj.insert(
   449         "actor_kind".to_string(),
   450         Value::String(actor_kind.to_string()),
   451     );
   452     obj.insert(
   453         "payload_type".to_string(),
   454         Value::String(payload_type.to_string()),
   455     );
   456     obj.insert("op_kind".to_string(), Value::String(String::new()));
   457     obj.insert("op_status".to_string(), Value::String(String::new()));
   458     obj.insert("request_id".to_string(), Value::String(String::new()));
   459     obj.insert("trace_id".to_string(), Value::String(String::new()));
   460     obj.insert("turn_index".to_string(), json!(0u32));
   461     obj.insert("item_id".to_string(), Value::String(String::new()));
   462     obj.insert("tool_call_id".to_string(), Value::String(String::new()));
   463     obj.insert(
   464         "parent_tool_call_id".to_string(),
   465         Value::String(String::new()),
   466     );
   467     obj.insert("origin_event_id".to_string(), Value::String(String::new()));
   468     obj.insert(
   469         "origin_tool_call_id".to_string(),
   470         Value::String(String::new()),
   471     );
   472     obj.insert("tool_name".to_string(), Value::String(String::new()));
   473     obj.insert("tool_phase".to_string(), Value::String(String::new()));
   474     obj.insert("tool_error".to_string(), json!(0u8));
   475     obj.insert("agent_run_id".to_string(), Value::String(String::new()));
   476     obj.insert("agent_label".to_string(), Value::String(String::new()));
   477     obj.insert("coord_group_id".to_string(), Value::String(String::new()));
   478     obj.insert(
   479         "coord_group_label".to_string(),
   480         Value::String(String::new()),
   481     );
   482     obj.insert("is_substream".to_string(), json!(0u8));
   483     obj.insert("model".to_string(), Value::String(String::new()));
   484     obj.insert("input_tokens".to_string(), json!(0u32));
   485     obj.insert("output_tokens".to_string(), json!(0u32));
   486     obj.insert("cache_read_tokens".to_string(), json!(0u32));
   487     obj.insert("cache_write_tokens".to_string(), json!(0u32));
   488     obj.insert("latency_ms".to_string(), json!(0u32));
   489     obj.insert("retry_count".to_string(), json!(0u16));
   490     obj.insert("service_tier".to_string(), Value::String(String::new()));
   491     obj.insert("content_types".to_string(), json!([]));
   492     obj.insert("has_reasoning".to_string(), json!(0u8));
   493     obj.insert(
   494         "text_content".to_string(),
   495         Value::String(text_content.clone()),
   496     );
   497     obj.insert(
   498         "text_preview".to_string(),
   499         Value::String(truncate_chars(&text_content, PREVIEW_LIMIT)),
   500     );
   501     obj.insert(
   502         "payload_json".to_string(),
   503         Value::String(payload_json.to_string()),
   504     );
   505     obj.insert("token_usage_json".to_string(), Value::String(String::new()));
   506     obj.insert("event_version".to_string(), json!(event_version()));
   507     obj
   508 }
   509  
   510 fn build_link_row(
   511     ctx: &RecordContext<'_>,
   512     event_uid: &str,
   513     linked_event_uid: &str,
   514     linked_external_id: &str,
   515     link_type: &str,
   516     metadata_json: &str,
   517 ) -> Value {
   518     let link_type = canonicalize_link_type(link_type);
   519     json!({
   520         "event_uid": event_uid,
   521         "linked_event_uid": linked_event_uid,
   522         "linked_external_id": linked_external_id,
   523         "link_type": link_type,
   524         "session_id": ctx.session_id,
   525         "provider": ctx.provider,
   526         "source_name": ctx.source_name,
   527         "metadata_json": metadata_json,
   528         "event_version": event_version(),
   529     })
   530 }
   531  
   532 fn build_event_link_row(
   533     ctx: &RecordContext<'_>,
   534     event_uid: &str,
   535     linked_event_uid: &str,
   536     link_type: &str,
   537     metadata_json: &str,
   538 ) -> Value {
   539     build_link_row(
   540         ctx,
   541         event_uid,
   542         linked_event_uid,
   543         "",
   544         link_type,
   545         metadata_json,
   546     )
   547 }
   548  
   549 fn build_external_link_row(
   550     ctx: &RecordContext<'_>,
   551     event_uid: &str,
   552     linked_external_id: &str,
   553     link_type: &str,
   554     metadata_json: &str,
   555 ) -> Value {
   556     build_link_row(
   557         ctx,
   558         event_uid,
   559         "",
   560         linked_external_id,
   561         link_type,
   562         metadata_json,
   563     )
   564 }
   565  
   566 fn build_tool_row(
   567     ctx: &RecordContext<'_>,
   568     event_uid: &str,
   569     tool_call_id: &str,
   570     parent_tool_call_id: &str,
   571     tool_name: &str,
   572     tool_phase: &str,
   573     tool_error: u8,
   574     input_json: &str,
   575     output_json: &str,
   576     output_text: &str,
   577 ) -> Value {
   578     let input_json = truncate_chars(input_json, TEXT_LIMIT);
   579     let output_json = truncate_chars(output_json, TEXT_LIMIT);
   580     let output_text = truncate_chars(output_text, TEXT_LIMIT);
   581  
   582     json!({
   583         "event_uid": event_uid,
   584         "session_id": ctx.session_id,
   585         "provider": ctx.provider,
   586         "source_name": ctx.source_name,
   587         "tool_call_id": tool_call_id,
   588         "parent_tool_call_id": parent_tool_call_id,
   589         "tool_name": tool_name,
   590         "tool_phase": tool_phase,
   591         "tool_error": tool_error,
   592         "input_json": input_json,
   593         "output_json": output_json,
   594         "output_text": output_text,
   595         "input_bytes": input_json.len() as u32,
   596         "output_bytes": output_json.len() as u32,
   597         "input_preview": truncate_chars(&input_json, PREVIEW_LIMIT),
   598         "output_preview": truncate_chars(&output_text, PREVIEW_LIMIT),
   599         "io_hash": io_hash(&input_json, &output_json),
   600         "source_ref": format!("{}:{}:{}", ctx.source_file, ctx.source_generation, ctx.source_line_no),
   601         "event_version": event_version(),
   602     })
   603 }
   604  
   605 fn normalize_codex_event(
   606     record: &Value,
   607     ctx: &RecordContext<'_>,
   608     top_type: &str,
   609     base_uid: &str,
   610     model_hint: &str,
   611 ) -> (Vec<Value>, Vec<Value>, Vec<Value>) {
   612     let mut events = Vec::<Value>::new();
   613     let mut links = Vec::<Value>::new();
   614     let mut tools = Vec::<Value>::new();
   615  
   616     let payload = record.get("payload").cloned().unwrap_or(Value::Null);
   617     let payload_obj = payload.as_object().cloned().unwrap_or_else(Map::new);
   618     let payload_json = compact_json(&Value::Object(payload_obj.clone()));
   619  
   620     let push_parent_link = |links: &mut Vec<Value>, uid: &str, parent: &str| {
   621         if !parent.is_empty() {
   622             links.push(build_external_link_row(
   623                 ctx,
   624                 uid,
   625                 parent,
   626                 "parent_event",
   627                 "{}",
   628             ));
   629         }
   630     };
   631  
   632     match top_type {
   633         "session_meta" => {
   634             let mut row = base_event_obj(
   635                 ctx,
   636                 base_uid,
   637                 "session_meta",
   638                 "session_meta",
   639                 "system",
   640                 "",
   641                 &payload_json,
   642             );
   643             row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
   644             events.push(Value::Object(row));
   645         }
   646         "turn_context" => {
   647             let mut row = base_event_obj(
   648                 ctx,
   649                 base_uid,
   650                 "turn_context",
   651                 "turn_context",
   652                 "system",
   653                 "",
   654                 &payload_json,
   655             );
   656             row.insert(
   657                 "turn_index".to_string(),
   658                 json!(to_u32(payload_obj.get("turn_id"))),
   659             );
   660             let turn_id = to_str(payload_obj.get("turn_id"));
   661             if !turn_id.is_empty() {
   662                 row.insert("request_id".to_string(), json!(turn_id.clone()));
   663                 row.insert("item_id".to_string(), json!(turn_id));
   664             }
   665             let model = canonicalize_model("codex", &to_str(payload_obj.get("model")));
   666             if !model.is_empty() {
   667                 row.insert("model".to_string(), json!(model));
   668             }
   669             events.push(Value::Object(row));
   670         }
   671         "response_item" => {
   672             let payload_type = to_str(payload_obj.get("type"));
   673             match payload_type.as_str() {
   674                 "message" => {
   675                     let role = to_str(payload_obj.get("role"));
   676                     let content = payload_obj.get("content").cloned().unwrap_or(Value::Null);
   677                     let text = extract_message_text(&content);
   678                     let mut row = base_event_obj(
   679                         ctx,
   680                         base_uid,
   681                         "message",
   682                         "message",
   683                         if role.is_empty() {
   684                             "assistant"
   685                         } else {
   686                             role.as_str()
   687                         },
   688                         &text,
   689                         &payload_json,
   690                     );
   691                     row.insert(
   692                         "content_types".to_string(),
   693                         json!(extract_content_types(&content)),
   694                     );
   695                     row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
   696                     row.insert(
   697                         "op_status".to_string(),
   698                         json!(to_str(payload_obj.get("phase"))),
   699                     );
   700                     events.push(Value::Object(row));
   701                 }
   702                 "function_call" => {
   703                     let args = to_str(payload_obj.get("arguments"));
   704                     let call_id = to_str(payload_obj.get("call_id"));
   705                     let name = to_str(payload_obj.get("name"));
   706                     let mut row = base_event_obj(
   707                         ctx,
   708                         base_uid,
   709                         "tool_call",
   710                         "function_call",
   711                         "assistant",
   712                         &args,
   713                         &payload_json,
   714                     );
   715                     row.insert("tool_call_id".to_string(), json!(call_id.clone()));
   716                     row.insert("tool_name".to_string(), json!(name.clone()));
   717                     events.push(Value::Object(row));
   718  
   719                     tools.push(build_tool_row(
   720                         ctx, base_uid, &call_id, "", &name, "request", 0, &args, "", "",
   721                     ));
   722                 }
   723                 "function_call_output" => {
   724                     let output = to_str(payload_obj.get("output"));
   725                     let call_id = to_str(payload_obj.get("call_id"));
   726                     let mut row = base_event_obj(
   727                         ctx,
   728                         base_uid,
   729                         "tool_result",
   730                         "function_call_output",
   731                         "tool",
   732                         &output,
   733                         &payload_json,
   734                     );
   735                     row.insert("tool_call_id".to_string(), json!(call_id.clone()));
   736                     events.push(Value::Object(row));
   737  
   738                     tools.push(build_tool_row(
   739                         ctx,
   740                         base_uid,
   741                         &call_id,
   742                         "",
   743                         "",
   744                         "response",
   745                         0,
   746                         "",
   747                         &compact_json(payload_obj.get("output").unwrap_or(&Value::Null)),
   748                         &output,
   749                     ));
   750                 }
   751                 "custom_tool_call" => {
   752                     let input = to_str(payload_obj.get("input"));
   753                     let call_id = to_str(payload_obj.get("call_id"));
   754                     let name = to_str(payload_obj.get("name"));
   755                     let status = to_str(payload_obj.get("status"));
   756                     let mut row = base_event_obj(
   757                         ctx,
   758                         base_uid,
   759                         "tool_call",
   760                         "custom_tool_call",
   761                         "assistant",
   762                         &input,
   763                         &payload_json,
   764                     );
   765                     row.insert("tool_call_id".to_string(), json!(call_id.clone()));
   766                     row.insert("tool_name".to_string(), json!(name.clone()));
   767                     row.insert("op_status".to_string(), json!(status));
   768                     events.push(Value::Object(row));
   769  
   770                     tools.push(build_tool_row(
   771                         ctx, base_uid, &call_id, "", &name, "request", 0, &input, "", "",
   772                     ));
   773                 }
   774                 "custom_tool_call_output" => {
   775                     let output = to_str(payload_obj.get("output"));
   776                     let call_id = to_str(payload_obj.get("call_id"));
   777                     let status = to_str(payload_obj.get("status"));
   778                     let output_json = serde_json::from_str::<Value>(&output)
   779                         .map(|parsed| compact_json(&parsed))
   780                         .unwrap_or_else(|_| {
   781                             compact_json(payload_obj.get("output").unwrap_or(&Value::Null))
   782                         });
   783  
   784                     let mut row = base_event_obj(
   785                         ctx,
   786                         base_uid,
   787                         "tool_result",
   788                         "custom_tool_call_output",
   789                         "tool",
   790                         &output,
   791                         &payload_json,
   792                     );
   793                     row.insert("tool_call_id".to_string(), json!(call_id.clone()));
   794                     row.insert("op_status".to_string(), json!(status));
   795                     events.push(Value::Object(row));
   796  
   797                     tools.push(build_tool_row(
   798                         ctx,
   799                         base_uid,
   800                         &call_id,
   801                         "",
   802                         "",
   803                         "response",
   804                         0,
   805                         "",
   806                         &output_json,
   807                         &output,
   808                     ));
   809                 }
   810                 "web_search_call" => {
   811                     let action = payload_obj.get("action").cloned().unwrap_or(Value::Null);
   812                     let action_type = to_str(action.get("type"));
   813                     let status = to_str(payload_obj.get("status"));
   814                     let mut row = base_event_obj(
   815                         ctx,
   816                         base_uid,
   817                         "tool_call",
   818                         "web_search_call",
   819                         "assistant",
   820                         &extract_message_text(&action),
   821                         &payload_json,
   822                     );
   823                     row.insert("tool_name".to_string(), json!("web_search"));
   824                     row.insert("op_kind".to_string(), json!(action_type));
   825                     row.insert("op_status".to_string(), json!(status.clone()));
   826                     row.insert("tool_phase".to_string(), json!(status));
   827                     events.push(Value::Object(row));
   828                 }
   829                 "reasoning" => {
   830                     let summary = payload_obj.get("summary").cloned().unwrap_or(Value::Null);
   831                     let mut row = base_event_obj(
   832                         ctx,
   833                         base_uid,
   834                         "reasoning",
   835                         "reasoning",
   836                         "assistant",
   837                         &extract_message_text(&summary),
   838                         &payload_json,
   839                     );
   840                     row.insert("has_reasoning".to_string(), json!(1u8));
   841                     row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
   842                     events.push(Value::Object(row));
   843                 }
   844                 _ => {
   845                     events.push(Value::Object(base_event_obj(
   846                         ctx,
   847                         base_uid,
   848                         "unknown",
   849                         if payload_type.is_empty() {
   850                             "response_item"
   851                         } else {
   852                             payload_type.as_str()
   853                         },
   854                         "system",
   855                         &extract_message_text(&payload),
   856                         &payload_json,
   857                     )));
   858                 }
   859             }
   860         }
   861         "event_msg" => {
   862             let payload_type = to_str(payload_obj.get("type"));
   863             let actor = match payload_type.as_str() {
   864                 "user_message" => "user",
   865                 "agent_message" | "agent_reasoning" => "assistant",
   866                 _ => "system",
   867             };
   868             let mut row = base_event_obj(
   869                 ctx,
   870                 base_uid,
   871                 "event_msg",
   872                 if payload_type.is_empty() {
   873                     "event_msg"
   874                 } else {
   875                     payload_type.as_str()
   876                 },
   877                 actor,
   878                 &extract_message_text(&payload),
   879                 &payload_json,
   880             );
   881             let turn_id = to_str(payload_obj.get("turn_id"));
   882             if !turn_id.is_empty() {
   883                 row.insert("request_id".to_string(), json!(turn_id.clone()));
   884                 row.insert("item_id".to_string(), json!(turn_id));
   885             }
   886             let status = to_str(payload_obj.get("status"));
   887             if !status.is_empty() {
   888                 row.insert("op_status".to_string(), json!(status));
   889             }
   890             if payload_type == "token_count" {
   891                 let usage = payload_obj
   892                     .get("info")
   893                     .and_then(|v| v.get("last_token_usage"));
   894                 let input_tokens = to_u32(usage.and_then(|v| v.get("input_tokens")));
   895                 let output_tokens = to_u32(usage.and_then(|v| v.get("output_tokens")));
   896                 let cache_read_tokens = to_u32(
   897                     usage
   898                         .and_then(|v| v.get("cached_input_tokens"))
   899                         .or_else(|| usage.and_then(|v| v.get("cache_read_input_tokens"))),
   900                 );
   901                 let cache_write_tokens = to_u32(
   902                     usage
   903                         .and_then(|v| v.get("cache_creation_input_tokens"))
   904                         .or_else(|| usage.and_then(|v| v.get("cache_write_input_tokens"))),
   905                 );
   906  
   907                 let model = to_str(
   908                     payload_obj
   909                         .get("rate_limits")
   910                         .and_then(|v| v.get("limit_name")),
   911                 );
   912                 let fallback_model = to_str(payload_obj.get("model"));
   913                 let fallback_limit_id = to_str(
   914                     payload_obj
   915                         .get("rate_limits")
   916                         .and_then(|v| v.get("limit_id")),
   917                 );
   918                 let resolved_model = if !model.is_empty() {
   919                     canonicalize_model("codex", &model)
   920                 } else if !fallback_model.is_empty() {
   921                     canonicalize_model("codex", &fallback_model)
   922                 } else if !fallback_limit_id.is_empty() {
   923                     canonicalize_model("codex", &fallback_limit_id)
   924                 } else {
   925                     canonicalize_model("codex", model_hint)
   926                 };
   927  
   928                 row.insert("input_tokens".to_string(), json!(input_tokens));
   929                 row.insert("output_tokens".to_string(), json!(output_tokens));
   930                 row.insert("cache_read_tokens".to_string(), json!(cache_read_tokens));
   931                 row.insert("cache_write_tokens".to_string(), json!(cache_write_tokens));
   932                 if !resolved_model.is_empty() {
   933                     row.insert("model".to_string(), json!(resolved_model));
   934                 }
   935                 row.insert(
   936                     "service_tier".to_string(),
   937                     json!(to_str(
   938                         payload_obj
   939                             .get("rate_limits")
   940                             .and_then(|v| v.get("plan_type"))
   941                     )),
   942                 );
   943                 row.insert(
   944                     "token_usage_json".to_string(),
   945                     json!(compact_json(&payload)),
   946                 );
   947             } else if payload_type == "agent_reasoning" {
   948                 row.insert("has_reasoning".to_string(), json!(1u8));
   949             }
   950             events.push(Value::Object(row));
   951         }
   952         "compacted" => {
   953             events.push(Value::Object(base_event_obj(
   954                 ctx,
   955                 base_uid,
   956                 "compacted_raw",
   957                 "compacted",
   958                 "system",
   959                 "",
   960                 &payload_json,
   961             )));
   962  
   963             if let Some(Value::Array(items)) = payload_obj.get("replacement_history") {
   964                 for (idx, item) in items.iter().enumerate() {
   965                     let item_uid = event_uid(
   966                         ctx.source_file,
   967                         ctx.source_generation,
   968                         ctx.source_line_no,
   969                         ctx.source_offset,
   970                         &compact_json(item),
   971                         &format!("compacted:{}", idx),
   972                     );
   973                     let item_type = to_str(item.get("type"));
   974  
   975                     let (kind, payload_type, actor, text) = match item_type.as_str() {
   976                         "message" => (
   977                             "message",
   978                             "message",
   979                             to_str(item.get("role")),
   980                             extract_message_text(item.get("content").unwrap_or(&Value::Null)),
   981                         ),
   982                         "function_call" => (
   983                             "tool_call",
   984                             "function_call",
   985                             "assistant".to_string(),
   986                             to_str(item.get("arguments")),
   987                         ),
   988                         "function_call_output" => (
   989                             "tool_result",
   990                             "function_call_output",
   991                             "tool".to_string(),
   992                             to_str(item.get("output")),
   993                         ),
   994                         "reasoning" => (
   995                             "reasoning",
   996                             "reasoning",
   997                             "assistant".to_string(),
   998                             extract_message_text(item.get("summary").unwrap_or(&Value::Null)),
   999                         ),
  1000                         _ => (
  1001                             "unknown",
  1002                             if item_type.is_empty() {
  1003                                 "unknown"
  1004                             } else {
  1005                                 item_type.as_str()
  1006                             },
  1007                             "system".to_string(),
  1008                             extract_message_text(item),
  1009                         ),
  1010                     };
  1011  
  1012                     let mut row = base_event_obj(
  1013                         ctx,
  1014                         &item_uid,
  1015                         kind,
  1016                         payload_type,
  1017                         if actor.is_empty() {
  1018                             "assistant"
  1019                         } else {
  1020                             actor.as_str()
  1021                         },
  1022                         &text,
  1023                         &compact_json(item),
  1024                     );
  1025                     row.insert("origin_event_id".to_string(), json!(base_uid));
  1026                     events.push(Value::Object(row));
  1027  
  1028                     links.push(build_event_link_row(
  1029                         ctx,
  1030                         &item_uid,
  1031                         base_uid,
  1032                         "compacted_parent",
  1033                         "{}",
  1034                     ));
  1035                 }
  1036             }
  1037         }
  1038         "message" | "function_call" | "function_call_output" | "reasoning" => {
  1039             let event = if top_type == "message" {
  1040                 let role = to_str(record.get("role"));
  1041                 let text = extract_message_text(record.get("content").unwrap_or(&Value::Null));
  1042                 let mut row = base_event_obj(
  1043                     ctx,
  1044                     base_uid,
  1045                     "message",
  1046                     "message",
  1047                     if role.is_empty() {
  1048                         "assistant"
  1049                     } else {
  1050                         role.as_str()
  1051                     },
  1052                     &text,
  1053                     &compact_json(record),
  1054                 );
  1055                 row.insert(
  1056                     "content_types".to_string(),
  1057                     json!(extract_content_types(
  1058                         record.get("content").unwrap_or(&Value::Null)
  1059                     )),
  1060                 );
  1061                 Value::Object(row)
  1062             } else if top_type == "function_call" {
  1063                 let args = to_str(record.get("arguments"));
  1064                 let call_id = to_str(record.get("call_id"));
  1065                 let name = to_str(record.get("name"));
  1066                 let mut row = base_event_obj(
  1067                     ctx,
  1068                     base_uid,
  1069                     "tool_call",
  1070                     "function_call",
  1071                     "assistant",
  1072                     &args,
  1073                     &compact_json(record),
  1074                 );
  1075                 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
  1076                 row.insert("tool_name".to_string(), json!(name.clone()));
  1077                 tools.push(build_tool_row(
  1078                     ctx, base_uid, &call_id, "", &name, "request", 0, &args, "", "",
  1079                 ));
  1080                 Value::Object(row)
  1081             } else if top_type == "function_call_output" {
  1082                 let output = to_str(record.get("output"));
  1083                 let call_id = to_str(record.get("call_id"));
  1084                 let mut row = base_event_obj(
  1085                     ctx,
  1086                     base_uid,
  1087                     "tool_result",
  1088                     "function_call_output",
  1089                     "tool",
  1090                     &output,
  1091                     &compact_json(record),
  1092                 );
  1093                 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
  1094                 tools.push(build_tool_row(
  1095                     ctx,
  1096                     base_uid,
  1097                     &call_id,
  1098                     "",
  1099                     "",
  1100                     "response",
  1101                     0,
  1102                     "",
  1103                     &compact_json(record.get("output").unwrap_or(&Value::Null)),
  1104                     &output,
  1105                 ));
  1106                 Value::Object(row)
  1107             } else {
  1108                 let summary = record.get("summary").cloned().unwrap_or(Value::Null);
  1109                 let mut row = base_event_obj(
  1110                     ctx,
  1111                     base_uid,
  1112                     "reasoning",
  1113                     "reasoning",
  1114                     "assistant",
  1115                     &extract_message_text(&summary),
  1116                     &compact_json(record),
  1117                 );
  1118                 row.insert("has_reasoning".to_string(), json!(1u8));
  1119                 Value::Object(row)
  1120             };
  1121  
  1122             events.push(event);
  1123         }
  1124         _ => {
  1125             events.push(Value::Object(base_event_obj(
  1126                 ctx,
  1127                 base_uid,
  1128                 "unknown",
  1129                 if top_type.is_empty() {
  1130                     "unknown"
  1131                 } else {
  1132                     top_type
  1133                 },
  1134                 "system",
  1135                 &extract_message_text(record),
  1136                 &compact_json(record),
  1137             )));
  1138         }
  1139     }
  1140  
  1141     let payload_model = canonicalize_model("codex", &to_str(payload_obj.get("model")));
  1142     let inherited_model = canonicalize_model("codex", model_hint);
  1143     for event in &mut events {
  1144         if let Some(row) = event.as_object_mut() {
  1145             let row_model = canonicalize_model("codex", &to_str(row.get("model")));
  1146             let resolved_model = if !row_model.is_empty() {
  1147                 row_model
  1148             } else if !payload_model.is_empty() {
  1149                 payload_model.clone()
  1150             } else {
  1151                 inherited_model.clone()
  1152             };
  1153  
  1154             if !resolved_model.is_empty() {
  1155                 row.insert("model".to_string(), json!(resolved_model));
  1156             }
  1157         }
  1158     }
  1159  
  1160     let parent = to_str(record.get("parent_id"));
  1161     if !events.is_empty() && !parent.is_empty() {
  1162         if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
  1163             push_parent_link(&mut links, uid, &parent);
  1164         }
  1165     }
  1166  
  1167     (events, links, tools)
  1168 }
  1169  
  1170 fn normalize_claude_event(
  1171     record: &Value,
  1172     ctx: &RecordContext<'_>,
  1173     top_type: &str,
  1174     base_uid: &str,
  1175 ) -> (Vec<Value>, Vec<Value>, Vec<Value>) {
  1176     let mut events = Vec::<Value>::new();
  1177     let mut links = Vec::<Value>::new();
  1178     let mut tools = Vec::<Value>::new();
  1179  
  1180     let parent_uuid = to_str(record.get("parentUuid"));
  1181     let request_id = to_str(record.get("requestId"));
  1182     let trace_id = to_str(record.get("requestId"));
  1183     let agent_run_id = to_str(record.get("agentId"));
  1184     let agent_label = to_str(record.get("agentName"));
  1185     let coord_group_label = to_str(record.get("teamName"));
  1186     let is_substream = to_u8_bool(record.get("isSidechain"));
  1187  
  1188     let message = record.get("message").cloned().unwrap_or(Value::Null);
  1189     let msg_role = to_str(message.get("role"));
  1190     let model = canonicalize_model("claude", &to_str(message.get("model")));
  1191  
  1192     let usage = message.get("usage").cloned().unwrap_or(Value::Null);
  1193     let input_tokens = to_u32(usage.get("input_tokens"));
  1194     let output_tokens = to_u32(usage.get("output_tokens"));
  1195     let cache_read_tokens = to_u32(usage.get("cache_read_input_tokens"));
  1196     let cache_write_tokens = to_u32(usage.get("cache_creation_input_tokens"));
  1197     let service_tier = to_str(usage.get("service_tier"));
  1198  
  1199     let stamp_common = |obj: &mut Map<String, Value>| {
  1200         obj.insert("request_id".to_string(), json!(request_id.clone()));
  1201         obj.insert("trace_id".to_string(), json!(trace_id.clone()));
  1202         obj.insert("agent_run_id".to_string(), json!(agent_run_id.clone()));
  1203         obj.insert("agent_label".to_string(), json!(agent_label.clone()));
  1204         obj.insert(
  1205             "coord_group_label".to_string(),
  1206             json!(coord_group_label.clone()),
  1207         );
  1208         obj.insert("is_substream".to_string(), json!(is_substream));
  1209         obj.insert("model".to_string(), json!(model.clone()));
  1210         obj.insert("input_tokens".to_string(), json!(input_tokens));
  1211         obj.insert("output_tokens".to_string(), json!(output_tokens));
  1212         obj.insert("cache_read_tokens".to_string(), json!(cache_read_tokens));
  1213         obj.insert("cache_write_tokens".to_string(), json!(cache_write_tokens));
  1214         obj.insert("service_tier".to_string(), json!(service_tier.clone()));
  1215         obj.insert("item_id".to_string(), json!(to_str(record.get("uuid"))));
  1216         obj.insert(
  1217             "origin_event_id".to_string(),
  1218             json!(to_str(record.get("sourceToolAssistantUUID"))),
  1219         );
  1220         obj.insert(
  1221             "origin_tool_call_id".to_string(),
  1222             json!(to_str(record.get("sourceToolUseID"))),
  1223         );
  1224     };
  1225  
  1226     if top_type == "assistant" || top_type == "user" {
  1227         let actor = if top_type == "assistant" {
  1228             "assistant"
  1229         } else if msg_role == "assistant" {
  1230             "assistant"
  1231         } else {
  1232             "user"
  1233         };
  1234  
  1235         let content = message.get("content").cloned().unwrap_or_else(|| {
  1236             record
  1237                 .get("message")
  1238                 .and_then(|m| m.get("content"))
  1239                 .cloned()
  1240                 .unwrap_or(Value::Null)
  1241         });
  1242  
  1243         match content {
  1244             Value::Array(items) if !items.is_empty() => {
  1245                 for (idx, item) in items.iter().enumerate() {
  1246                     let block_type = to_str(item.get("type"));
  1247                     let suffix = format!("claude:block:{}", idx);
  1248                     let block_uid = event_uid(
  1249                         ctx.source_file,
  1250                         ctx.source_generation,
  1251                         ctx.source_line_no,
  1252                         ctx.source_offset,
  1253                         &compact_json(item),
  1254                         &suffix,
  1255                     );
  1256  
  1257                     let mut row = match block_type.as_str() {
  1258                         "thinking" => {
  1259                             let mut r = base_event_obj(
  1260                                 ctx,
  1261                                 &block_uid,
  1262                                 "reasoning",
  1263                                 "thinking",
  1264                                 "assistant",
  1265                                 &extract_message_text(item),
  1266                                 &compact_json(item),
  1267                             );
  1268                             r.insert("has_reasoning".to_string(), json!(1u8));
  1269                             r.insert("content_types".to_string(), json!(["thinking"]));
  1270                             r
  1271                         }
  1272                         "tool_use" => {
  1273                             let tool_call_id = to_str(item.get("id"));
  1274                             let tool_name = to_str(item.get("name"));
  1275                             let input_json =
  1276                                 compact_json(item.get("input").unwrap_or(&Value::Null));
  1277                             let mut r = base_event_obj(
  1278                                 ctx,
  1279                                 &block_uid,
  1280                                 "tool_call",
  1281                                 "tool_use",
  1282                                 "assistant",
  1283                                 &extract_message_text(item.get("input").unwrap_or(&Value::Null)),
  1284                                 &compact_json(item),
  1285                             );
  1286                             r.insert("content_types".to_string(), json!(["tool_use"]));
  1287                             r.insert("tool_call_id".to_string(), json!(tool_call_id.clone()));
  1288                             r.insert("tool_name".to_string(), json!(tool_name.clone()));
  1289                             tools.push(build_tool_row(
  1290                                 ctx,
  1291                                 &block_uid,
  1292                                 &tool_call_id,
  1293                                 &to_str(record.get("parentToolUseID")),
  1294                                 &tool_name,
  1295                                 "request",
  1296                                 0,
  1297                                 &input_json,
  1298                                 "",
  1299                                 "",
  1300                             ));
  1301                             r
  1302                         }
  1303                         "tool_result" => {
  1304                             let tool_call_id = to_str(item.get("tool_use_id"));
  1305                             let output_json =
  1306                                 compact_json(item.get("content").unwrap_or(&Value::Null));
  1307                             let output_text =
  1308                                 extract_message_text(item.get("content").unwrap_or(&Value::Null));
  1309                             let tool_error = to_u8_bool(item.get("is_error"));
  1310                             let mut r = base_event_obj(
  1311                                 ctx,
  1312                                 &block_uid,
  1313                                 "tool_result",
  1314                                 "tool_result",
  1315                                 "tool",
  1316                                 &output_text,
  1317                                 &compact_json(item),
  1318                             );
  1319                             r.insert("content_types".to_string(), json!(["tool_result"]));
  1320                             r.insert("tool_call_id".to_string(), json!(tool_call_id.clone()));
  1321                             r.insert("tool_error".to_string(), json!(tool_error));
  1322                             tools.push(build_tool_row(
  1323                                 ctx,
  1324                                 &block_uid,
  1325                                 &tool_call_id,
  1326                                 &to_str(record.get("parentToolUseID")),
  1327                                 "",
  1328                                 "response",
  1329                                 tool_error,
  1330                                 "",
  1331                                 &output_json,
  1332                                 &output_text,
  1333                             ));
  1334                             r
  1335                         }
  1336                         _ => {
  1337                             let mut r = base_event_obj(
  1338                                 ctx,
  1339                                 &block_uid,
  1340                                 "message",
  1341                                 if block_type.is_empty() {
  1342                                     "text"
  1343                                 } else {
  1344                                     block_type.as_str()
  1345                                 },
  1346                                 actor,
  1347                                 &extract_message_text(item),
  1348                                 &compact_json(item),
  1349                             );
  1350                             if !block_type.is_empty() {
  1351                                 r.insert("content_types".to_string(), json!([block_type]));
  1352                             }
  1353                             r
  1354                         }
  1355                     };
  1356  
  1357                     stamp_common(&mut row);
  1358                     row.insert(
  1359                         "parent_tool_call_id".to_string(),
  1360                         json!(to_str(record.get("parentToolUseID"))),
  1361                     );
  1362                     row.insert(
  1363                         "origin_tool_call_id".to_string(),
  1364                         json!(to_str(record.get("sourceToolUseID"))),
  1365                     );
  1366                     row.insert(
  1367                         "tool_phase".to_string(),
  1368                         json!(to_str(record.get("stop_reason"))),
  1369                     );
  1370                     events.push(Value::Object(row));
  1371  
  1372                     if !parent_uuid.is_empty() {
  1373                         links.push(build_external_link_row(
  1374                             ctx,
  1375                             &block_uid,
  1376                             &parent_uuid,
  1377                             "parent_uuid",
  1378                             "{}",
  1379                         ));
  1380                     }
  1381                 }
  1382             }
  1383             _ => {
  1384                 let text = extract_message_text(&message);
  1385                 let mut row = base_event_obj(
  1386                     ctx,
  1387                     base_uid,
  1388                     "message",
  1389                     "message",
  1390                     actor,
  1391                     &text,
  1392                     &compact_json(record),
  1393                 );
  1394                 row.insert(
  1395                     "content_types".to_string(),
  1396                     json!(extract_content_types(
  1397                         message.get("content").unwrap_or(&Value::Null)
  1398                     )),
  1399                 );
  1400                 stamp_common(&mut row);
  1401                 events.push(Value::Object(row));
  1402                 if !parent_uuid.is_empty() {
  1403                     links.push(build_external_link_row(
  1404                         ctx,
  1405                         base_uid,
  1406                         &parent_uuid,
  1407                         "parent_uuid",
  1408                         "{}",
  1409                     ));
  1410                 }
  1411             }
  1412         }
  1413     } else {
  1414         let event_kind = match top_type {
  1415             "progress" => "progress",
  1416             "system" => "system",
  1417             "summary" => "summary",
  1418             "queue-operation" => "queue_operation",
  1419             "file-history-snapshot" => "file_history_snapshot",
  1420             _ => "unknown",
  1421         };
  1422  
  1423         let payload_type = if top_type == "progress" {
  1424             to_str(record.get("data").and_then(|d| d.get("type")))
  1425         } else if top_type == "system" {
  1426             to_str(record.get("subtype"))
  1427         } else {
  1428             top_type.to_string()
  1429         };
  1430  
  1431         let mut row = base_event_obj(
  1432             ctx,
  1433             base_uid,
  1434             event_kind,
  1435             if payload_type.is_empty() {
  1436                 top_type
  1437             } else {
  1438                 payload_type.as_str()
  1439             },
  1440             "system",
  1441             &extract_message_text(record),
  1442             &compact_json(record),
  1443         );
  1444         row.insert("op_kind".to_string(), json!(payload_type));
  1445         row.insert("op_status".to_string(), json!(to_str(record.get("status"))));
  1446         row.insert(
  1447             "latency_ms".to_string(),
  1448             json!(to_u32(record.get("durationMs"))),
  1449         );
  1450         row.insert(
  1451             "retry_count".to_string(),
  1452             json!(to_u16(record.get("retryAttempt"))),
  1453         );
  1454         stamp_common(&mut row);
  1455         events.push(Value::Object(row));
  1456  
  1457         if !parent_uuid.is_empty() {
  1458             links.push(build_external_link_row(
  1459                 ctx,
  1460                 base_uid,
  1461                 &parent_uuid,
  1462                 "parent_uuid",
  1463                 "{}",
  1464             ));
  1465         }
  1466     }
  1467  
  1468     if !events.is_empty() {
  1469         let tool_use_id = to_str(record.get("toolUseID"));
  1470         if !tool_use_id.is_empty() {
  1471             if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
  1472                 links.push(build_external_link_row(
  1473                     ctx,
  1474                     uid,
  1475                     &tool_use_id,
  1476                     "tool_use_id",
  1477                     "{}",
  1478                 ));
  1479             }
  1480         }
  1481  
  1482         let source_tool_assistant = to_str(record.get("sourceToolAssistantUUID"));
  1483         if !source_tool_assistant.is_empty() {
  1484             if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
  1485                 links.push(build_external_link_row(
  1486                     ctx,
  1487                     uid,
  1488                     &source_tool_assistant,
  1489                     "source_tool_assistant",
  1490                     "{}",
  1491                 ));
  1492             }
  1493         }
  1494     }
  1495  
  1496     (events, links, tools)
  1497 }
  1498  
  1499 pub fn normalize_record(
  1500     record: &Value,
  1501     source_name: &str,
  1502     provider: &str,
  1503     source_file: &str,
  1504     source_inode: u64,
  1505     source_generation: u32,
  1506     source_line_no: u64,
  1507     source_offset: u64,
  1508     session_hint: &str,
  1509     model_hint: &str,
  1510 ) -> Result<NormalizedRecord> {
  1511     let provider = Provider::parse(provider)?;
  1512     let provider_name = provider.as_str();
  1513     let record_ts = to_str(record.get("timestamp"));
  1514     let (event_ts, event_ts_parse_failed) = parse_event_ts(&record_ts);
  1515     let top_type = to_str(record.get("type"));
  1516  
  1517     let mut session_id = if provider == Provider::Claude {
  1518         to_str(record.get("sessionId"))
  1519     } else {
  1520         String::new()
  1521     };
  1522     if session_id.is_empty() {
  1523         session_id = if session_hint.is_empty() {
  1524             infer_session_id_from_file(source_file)
  1525         } else {
  1526             session_hint.to_string()
  1527         };
  1528     }
  1529  
  1530     if provider == Provider::Codex && top_type == "session_meta" {
  1531         let payload = record.get("payload").cloned().unwrap_or(Value::Null);
  1532         let payload_id = to_str(payload.get("id"));
  1533         if !payload_id.is_empty() {
  1534             session_id = payload_id;
  1535         }
  1536     }
  1537  
  1538     let session_date = infer_session_date_from_file(source_file, &record_ts);
  1539  
  1540     let raw_json = compact_json(record);
  1541     let base_uid = event_uid(
  1542         source_file,
  1543         source_generation,
  1544         source_line_no,
  1545         source_offset,
  1546         &raw_json,
  1547         "raw",
  1548     );
  1549  
  1550     let raw_row = json!({
  1551         "source_name": source_name,
  1552         "provider": provider_name,
  1553         "source_file": source_file,
  1554         "source_inode": source_inode,
  1555         "source_generation": source_generation,
  1556         "source_line_no": source_line_no,
  1557         "source_offset": source_offset,
  1558         "record_ts": record_ts,
  1559         "top_type": top_type,
  1560         "session_id": session_id,
  1561         "raw_json": raw_json,
  1562         "raw_json_hash": raw_hash(&raw_json),
  1563         "event_uid": base_uid,
  1564     });
  1565  
  1566     let mut error_rows = Vec::<Value>::new();
  1567     if event_ts_parse_failed {
  1568         error_rows.push(json!({
  1569             "source_name": source_name,
  1570             "provider": provider_name,
  1571             "source_file": source_file,
  1572             "source_inode": source_inode,
  1573             "source_generation": source_generation,
  1574             "source_line_no": source_line_no,
  1575             "source_offset": source_offset,
  1576             "error_kind": "timestamp_parse_error",
  1577             "error_text": format!(
  1578                 "timestamp is missing or not RFC3339; used {} UTC fallback",
  1579                 UNPARSEABLE_EVENT_TS
  1580             ),
  1581             "raw_fragment": truncate_chars(&record_ts, 20_000),
  1582         }));
  1583     }
  1584  
  1585     let ctx = RecordContext {
  1586         source_name,
  1587         provider: provider_name,
  1588         session_id: &session_id,
  1589         session_date: &session_date,
  1590         source_file,
  1591         source_inode,
  1592         source_generation,
  1593         source_line_no,
  1594         source_offset,
  1595         record_ts: &record_ts,
  1596         event_ts: &event_ts,
  1597     };
  1598  
  1599     let (event_rows, link_rows, tool_rows) = if provider == Provider::Claude {
  1600         normalize_claude_event(record, &ctx, &top_type, &base_uid)
  1601     } else {
  1602         normalize_codex_event(record, &ctx, &top_type, &base_uid, model_hint)
  1603     };
  1604     let model_hint = resolve_model_hint(&event_rows, provider_name, model_hint);
  1605  
  1606     Ok(NormalizedRecord {
  1607         raw_row,
  1608         event_rows,
  1609         link_rows,
  1610         tool_rows,
  1611         error_rows,
  1612         session_hint: session_id,
  1613         model_hint,
  1614     })
  1615 }
  1616  
  1617 #[cfg(test)]
  1618 mod tests {
  1619     use super::{build_link_row, normalize_record, RecordContext};
  1620     use serde_json::json;
  1621     use std::collections::HashMap;
  1622  
  1623     #[test]
  1624     fn codex_tool_call_normalization() {
  1625         let record = json!({
  1626             "timestamp": "2026-02-14T02:28:00.000Z",
  1627             "type": "response_item",
  1628             "payload": {
  1629                 "type": "function_call",
  1630                 "call_id": "call_123",
  1631                 "name": "Read",
  1632                 "arguments": "{\"path\":\"README.md\"}"
  1633             }
  1634         });
  1635  
  1636         let out = normalize_record(
  1637             &record,
  1638             "codex",
  1639             "codex",
  1640             "/Users/eric/.codex/sessions/2026/02/13/session-019c59f9-6389-77a1-a0cb-304eecf935b6.jsonl",
  1641             123,
  1642             1,
  1643             42,
  1644             1024,
  1645             "",
  1646             "",
  1647         )
  1648         .expect("codex tool call should normalize");
  1649  
  1650         assert_eq!(out.event_rows.len(), 1);
  1651         assert_eq!(out.tool_rows.len(), 1);
  1652         assert!(out.error_rows.is_empty());
  1653         let row = out.event_rows[0].as_object().unwrap();
  1654         assert_eq!(
  1655             row.get("event_kind").unwrap().as_str().unwrap(),
  1656             "tool_call"
  1657         );
  1658         assert_eq!(row.get("tool_name").unwrap().as_str().unwrap(), "Read");
  1659     }
  1660  
  1661     #[test]
  1662     fn codex_turn_context_promotes_model_and_turn_id() {
  1663         let record = json!({
  1664             "timestamp": "2026-02-15T03:50:42.191Z",
  1665             "type": "turn_context",
  1666             "payload": {
  1667                 "turn_id": "019c5f6a-49bd-7920-ac67-1dd8e33b0e95",
  1668                 "model": "gpt-5.3-codex"
  1669             }
  1670         });
  1671  
  1672         let out = normalize_record(
  1673             &record,
  1674             "codex",
  1675             "codex",
  1676             "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  1677             1,
  1678             1,
  1679             1,
  1680             1,
  1681             "",
  1682             "",
  1683         )
  1684         .expect("codex turn context should normalize");
  1685  
  1686         let row = out.event_rows[0].as_object().unwrap();
  1687         assert_eq!(
  1688             row.get("payload_type").unwrap().as_str().unwrap(),
  1689             "turn_context"
  1690         );
  1691         assert_eq!(row.get("model").unwrap().as_str().unwrap(), "gpt-5.3-codex");
  1692         assert_eq!(
  1693             row.get("request_id").unwrap().as_str().unwrap(),
  1694             "019c5f6a-49bd-7920-ac67-1dd8e33b0e95"
  1695         );
  1696         assert_eq!(
  1697             row.get("item_id").unwrap().as_str().unwrap(),
  1698             "019c5f6a-49bd-7920-ac67-1dd8e33b0e95"
  1699         );
  1700     }
  1701  
  1702     #[test]
  1703     fn codex_token_count_promotes_usage_fields() {
  1704         let record = json!({
  1705             "timestamp": "2026-02-15T03:50:50.838Z",
  1706             "type": "event_msg",
  1707             "payload": {
  1708                 "type": "token_count",
  1709                 "info": {
  1710                     "last_token_usage": {
  1711                         "input_tokens": 65323,
  1712                         "output_tokens": 445,
  1713                         "cached_input_tokens": 58624
  1714                     }
  1715                 },
  1716                 "rate_limits": {
  1717                     "limit_name": "GPT-5.3-Codex-Spark",
  1718                     "limit_id": "codex_bengalfox",
  1719                     "plan_type": "pro"
  1720                 }
  1721             }
  1722         });
  1723  
  1724         let out = normalize_record(
  1725             &record,
  1726             "codex",
  1727             "codex",
  1728             "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  1729             1,
  1730             1,
  1731             2,
  1732             2,
  1733             "",
  1734             "",
  1735         )
  1736         .expect("codex token count should normalize");
  1737  
  1738         let row = out.event_rows[0].as_object().unwrap();
  1739         assert_eq!(
  1740             row.get("payload_type").unwrap().as_str().unwrap(),
  1741             "token_count"
  1742         );
  1743         assert_eq!(row.get("input_tokens").unwrap().as_u64().unwrap(), 65323);
  1744         assert_eq!(row.get("output_tokens").unwrap().as_u64().unwrap(), 445);
  1745         assert_eq!(
  1746             row.get("cache_read_tokens").unwrap().as_u64().unwrap(),
  1747             58624
  1748         );
  1749         assert_eq!(
  1750             row.get("model").unwrap().as_str().unwrap(),
  1751             "gpt-5.3-codex-spark"
  1752         );
  1753         assert_eq!(row.get("service_tier").unwrap().as_str().unwrap(), "pro");
  1754         assert!(!row
  1755             .get("token_usage_json")
  1756             .unwrap()
  1757             .as_str()
  1758             .unwrap()
  1759             .is_empty());
  1760     }
  1761  
  1762     #[test]
  1763     fn codex_token_count_alias_codex_maps_to_xhigh() {
  1764         let record = json!({
  1765             "timestamp": "2026-02-15T04:52:55.538Z",
  1766             "type": "event_msg",
  1767             "payload": {
  1768                 "type": "token_count",
  1769                 "info": {
  1770                     "last_token_usage": {
  1771                         "input_tokens": 72636,
  1772                         "output_tokens": 285,
  1773                         "cached_input_tokens": 70784
  1774                     }
  1775                 },
  1776                 "rate_limits": {
  1777                     "limit_id": "codex",
  1778                     "limit_name": null,
  1779                     "plan_type": "pro"
  1780                 }
  1781             }
  1782         });
  1783  
  1784         let out = normalize_record(
  1785             &record,
  1786             "codex",
  1787             "codex",
  1788             "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  1789             1,
  1790             1,
  1791             4,
  1792             4,
  1793             "",
  1794             "",
  1795         )
  1796         .expect("codex token count alias should normalize");
  1797  
  1798         let row = out.event_rows[0].as_object().unwrap();
  1799         assert_eq!(
  1800             row.get("model").unwrap().as_str().unwrap(),
  1801             "gpt-5.3-codex-xhigh"
  1802         );
  1803     }
  1804  
  1805     #[test]
  1806     fn codex_custom_tool_call_promotes_tool_fields() {
  1807         let record = json!({
  1808             "timestamp": "2026-02-15T03:50:50.838Z",
  1809             "type": "response_item",
  1810             "payload": {
  1811                 "type": "custom_tool_call",
  1812                 "call_id": "call_abc",
  1813                 "name": "apply_patch",
  1814                 "status": "completed",
  1815                 "input": "*** Begin Patch\n*** End Patch\n"
  1816             }
  1817         });
  1818  
  1819         let out = normalize_record(
  1820             &record,
  1821             "codex",
  1822             "codex",
  1823             "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  1824             1,
  1825             1,
  1826             3,
  1827             3,
  1828             "",
  1829             "",
  1830         )
  1831         .expect("codex custom tool call should normalize");
  1832  
  1833         assert_eq!(out.event_rows.len(), 1);
  1834         assert_eq!(out.tool_rows.len(), 1);
  1835         let row = out.event_rows[0].as_object().unwrap();
  1836         assert_eq!(
  1837             row.get("event_kind").unwrap().as_str().unwrap(),
  1838             "tool_call"
  1839         );
  1840         assert_eq!(
  1841             row.get("tool_call_id").unwrap().as_str().unwrap(),
  1842             "call_abc"
  1843         );
  1844         assert_eq!(
  1845             row.get("tool_name").unwrap().as_str().unwrap(),
  1846             "apply_patch"
  1847         );
  1848         assert_eq!(row.get("op_status").unwrap().as_str().unwrap(), "completed");
  1849     }
  1850  
  1851     #[test]
  1852     fn claude_tool_use_and_result_blocks() {
  1853         let record = json!({
  1854             "type": "assistant",
  1855             "sessionId": "7c666c01-d38e-4658-8650-854ffb5b626e",
  1856             "uuid": "assistant-1",
  1857             "parentUuid": "user-1",
  1858             "requestId": "req-1",
  1859             "timestamp": "2026-01-19T15:58:41.421Z",
  1860             "message": {
  1861                 "model": "claude-opus-4-5-20251101",
  1862                 "role": "assistant",
  1863                 "usage": {
  1864                     "input_tokens": 9,
  1865                     "output_tokens": 5,
  1866                     "cache_creation_input_tokens": 19630,
  1867                     "cache_read_input_tokens": 0,
  1868                     "service_tier": "standard"
  1869                 },
  1870                 "content": [
  1871                     {
  1872                         "type": "tool_use",
  1873                         "id": "toolu_1",
  1874                         "name": "WebFetch",
  1875                         "input": {"url": "https://example.com"}
  1876                     },
  1877                     {
  1878                         "type": "text",
  1879                         "text": "done"
  1880                     }
  1881                 ]
  1882             }
  1883         });
  1884  
  1885         let out = normalize_record(
  1886             &record,
  1887             "claude",
  1888             "claude",
  1889             "/Users/eric/.claude/projects/p1/s1.jsonl",
  1890             55,
  1891             2,
  1892             10,
  1893             100,
  1894             "",
  1895             "",
  1896         )
  1897         .expect("claude event should normalize");
  1898  
  1899         assert_eq!(out.event_rows.len(), 2);
  1900         assert_eq!(out.tool_rows.len(), 1);
  1901  
  1902         let first = out.event_rows[0].as_object().unwrap();
  1903         assert_eq!(
  1904             first.get("event_kind").unwrap().as_str().unwrap(),
  1905             "tool_call"
  1906         );
  1907         assert_eq!(first.get("provider").unwrap().as_str().unwrap(), "claude");
  1908         assert!(out.error_rows.is_empty());
  1909     }
  1910  
  1911     #[test]
  1912     fn invalid_timestamp_uses_epoch_and_emits_timestamp_parse_error() {
  1913         let record = json!({
  1914             "timestamp": "not-a-timestamp",
  1915             "type": "response_item",
  1916             "payload": {
  1917                 "type": "function_call",
  1918                 "call_id": "call_bad_ts",
  1919                 "name": "Read",
  1920                 "arguments": "{}"
  1921             }
  1922         });
  1923  
  1924         let out = normalize_record(
  1925             &record,
  1926             "codex",
  1927             "codex",
  1928             "/Users/eric/.codex/sessions/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  1929             9,
  1930             2,
  1931             7,
  1932             99,
  1933             "",
  1934             "",
  1935         )
  1936         .expect("codex event with invalid timestamp should normalize");
  1937  
  1938         let event_row = out.event_rows[0].as_object().unwrap();
  1939         assert_eq!(
  1940             event_row.get("event_ts").unwrap().as_str().unwrap(),
  1941             "1970-01-01 00:00:00.000"
  1942         );
  1943         assert_eq!(
  1944             event_row.get("session_date").unwrap().as_str().unwrap(),
  1945             "1970-01-01"
  1946         );
  1947  
  1948         assert_eq!(out.error_rows.len(), 1);
  1949         let error = out.error_rows[0].as_object().unwrap();
  1950         assert_eq!(
  1951             error.get("error_kind").unwrap().as_str().unwrap(),
  1952             "timestamp_parse_error"
  1953         );
  1954         assert_eq!(
  1955             error.get("raw_fragment").unwrap().as_str().unwrap(),
  1956             "not-a-timestamp"
  1957         );
  1958     }
  1959  
  1960     #[test]
  1961     fn invalid_timestamp_preserves_session_date_from_source_path() {
  1962         let record = json!({
  1963             "timestamp": "still-not-a-timestamp",
  1964             "type": "response_item",
  1965             "payload": {
  1966                 "type": "function_call",
  1967                 "call_id": "call_bad_ts",
  1968                 "name": "Read",
  1969                 "arguments": "{}"
  1970             }
  1971         });
  1972  
  1973         let out = normalize_record(
  1974             &record,
  1975             "codex",
  1976             "codex",
  1977             "/Users/eric/.codex/sessions/2026/02/16/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  1978             11,
  1979             4,
  1980             12,
  1981             144,
  1982             "",
  1983             "",
  1984         )
  1985         .expect("codex event should normalize while preserving session date from path");
  1986  
  1987         let event_row = out.event_rows[0].as_object().unwrap();
  1988         assert_eq!(
  1989             event_row.get("event_ts").unwrap().as_str().unwrap(),
  1990             "1970-01-01 00:00:00.000"
  1991         );
  1992         assert_eq!(
  1993             event_row.get("session_date").unwrap().as_str().unwrap(),
  1994             "2026-02-16"
  1995         );
  1996         assert_eq!(out.error_rows.len(), 1);
  1997     }
  1998  
  1999     #[test]
  2000     fn unknown_provider_is_rejected() {
  2001         let record = json!({
  2002             "timestamp": "2026-02-15T03:50:42.191Z",
  2003             "type": "turn_context",
  2004         });
  2005  
  2006         let err = normalize_record(
  2007             &record,
  2008             "unknown",
  2009             "unknown",
  2010             "/tmp/sessions/session-1.jsonl",
  2011             1,
  2012             1,
  2013             1,
  2014             1,
  2015             "",
  2016             "",
  2017         )
  2018         .expect_err("unknown provider should be rejected");
  2019  
  2020         assert!(
  2021             err.to_string().contains("unsupported provider"),
  2022             "unexpected error: {err:#}"
  2023         );
  2024     }
  2025  
  2026     #[test]
  2027     fn claude_links_split_event_uids_from_external_ids() {
  2028         let record = json!({
  2029             "type": "assistant",
  2030             "sessionId": "7c666c01-d38e-4658-8650-854ffb5b626e",
  2031             "uuid": "assistant-2",
  2032             "parentUuid": "user-parent-2",
  2033             "toolUseID": "toolu_42",
  2034             "sourceToolAssistantUUID": "assistant-root-1",
  2035             "requestId": "req-2",
  2036             "timestamp": "2026-01-19T15:59:41.421Z",
  2037             "message": {
  2038                 "role": "assistant",
  2039                 "content": "done"
  2040             }
  2041         });
  2042  
  2043         let out = normalize_record(
  2044             &record,
  2045             "claude",
  2046             "claude",
  2047             "/Users/eric/.claude/projects/p1/s1.jsonl",
  2048             55,
  2049             2,
  2050             11,
  2051             101,
  2052             "",
  2053             "",
  2054         )
  2055         .expect("claude assistant record should normalize");
  2056  
  2057         assert_eq!(out.link_rows.len(), 3);
  2058  
  2059         let by_type = out
  2060             .link_rows
  2061             .iter()
  2062             .map(|row| {
  2063                 let obj = row.as_object().expect("link row object");
  2064                 let link_type = obj
  2065                     .get("link_type")
  2066                     .and_then(|v| v.as_str())
  2067                     .expect("link_type")
  2068                     .to_string();
  2069                 (link_type, obj.clone())
  2070             })
  2071             .collect::<HashMap<_, _>>();
  2072  
  2073         let parent = by_type.get("parent_uuid").expect("parent_uuid link");
  2074         assert_eq!(
  2075             parent
  2076                 .get("linked_external_id")
  2077                 .and_then(|v| v.as_str())
  2078                 .unwrap(),
  2079             "user-parent-2"
  2080         );
  2081         assert_eq!(
  2082             parent
  2083                 .get("linked_event_uid")
  2084                 .and_then(|v| v.as_str())
  2085                 .unwrap(),
  2086             ""
  2087         );
  2088  
  2089         let tool_use = by_type.get("tool_use_id").expect("tool_use_id link");
  2090         assert_eq!(
  2091             tool_use
  2092                 .get("linked_external_id")
  2093                 .and_then(|v| v.as_str())
  2094                 .unwrap(),
  2095             "toolu_42"
  2096         );
  2097         assert_eq!(
  2098             tool_use
  2099                 .get("linked_event_uid")
  2100                 .and_then(|v| v.as_str())
  2101                 .unwrap(),
  2102             ""
  2103         );
  2104  
  2105         let source_tool = by_type
  2106             .get("source_tool_assistant")
  2107             .expect("source_tool_assistant link");
  2108         assert_eq!(
  2109             source_tool
  2110                 .get("linked_external_id")
  2111                 .and_then(|v| v.as_str())
  2112                 .unwrap(),
  2113             "assistant-root-1"
  2114         );
  2115         assert_eq!(
  2116             source_tool
  2117                 .get("linked_event_uid")
  2118                 .and_then(|v| v.as_str())
  2119                 .unwrap(),
  2120             ""
  2121         );
  2122     }
  2123  
  2124     #[test]
  2125     fn codex_compacted_parent_link_uses_event_uid_target() {
  2126         let record = json!({
  2127             "timestamp": "2026-02-15T03:50:50.838Z",
  2128             "type": "compacted",
  2129             "payload": {
  2130                 "replacement_history": [
  2131                     {
  2132                         "type": "message",
  2133                         "role": "assistant",
  2134                         "content": [
  2135                             {"type": "text", "text": "hello"}
  2136                         ]
  2137                     }
  2138                 ]
  2139             }
  2140         });
  2141  
  2142         let out = normalize_record(
  2143             &record,
  2144             "codex",
  2145             "codex",
  2146             "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  2147             1,
  2148             1,
  2149             12,
  2150             12,
  2151             "",
  2152             "",
  2153         )
  2154         .expect("compacted record should normalize");
  2155  
  2156         let compacted_uid = out.event_rows[0]
  2157             .get("event_uid")
  2158             .and_then(|v| v.as_str())
  2159             .expect("compacted event uid");
  2160         let link = out.link_rows[0].as_object().expect("compacted link");
  2161  
  2162         assert_eq!(
  2163             link.get("link_type").and_then(|v| v.as_str()).unwrap(),
  2164             "compacted_parent"
  2165         );
  2166         assert_eq!(
  2167             link.get("linked_event_uid")
  2168                 .and_then(|v| v.as_str())
  2169                 .unwrap(),
  2170             compacted_uid
  2171         );
  2172         assert_eq!(
  2173             link.get("linked_external_id")
  2174                 .and_then(|v| v.as_str())
  2175                 .unwrap(),
  2176             ""
  2177         );
  2178     }
  2179  
  2180     #[test]
  2181     fn codex_unknown_payload_type_is_canonicalized() {
  2182         let record = json!({
  2183             "timestamp": "2026-02-15T03:50:50.838Z",
  2184             "type": "response_item",
  2185             "payload": {
  2186                 "type": "brand_new_payload_type",
  2187                 "body": "x"
  2188             }
  2189         });
  2190  
  2191         let out = normalize_record(
  2192             &record,
  2193             "codex",
  2194             "codex",
  2195             "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  2196             1,
  2197             1,
  2198             5,
  2199             5,
  2200             "",
  2201             "",
  2202         )
  2203         .expect("record should normalize");
  2204  
  2205         let row = out.event_rows[0].as_object().unwrap();
  2206         assert_eq!(row.get("event_kind").unwrap().as_str().unwrap(), "unknown");
  2207         assert_eq!(
  2208             row.get("payload_type").unwrap().as_str().unwrap(),
  2209             "unknown"
  2210         );
  2211     }
  2212  
  2213     #[test]
  2214     fn codex_event_msg_known_operational_payload_type_is_preserved() {
  2215         let record = json!({
  2216             "timestamp": "2026-02-15T03:50:50.838Z",
  2217             "type": "event_msg",
  2218             "payload": {
  2219                 "type": "task_started",
  2220                 "status": "in_progress"
  2221             }
  2222         });
  2223  
  2224         let out = normalize_record(
  2225             &record,
  2226             "codex",
  2227             "codex",
  2228             "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  2229             1,
  2230             1,
  2231             6,
  2232             6,
  2233             "",
  2234             "",
  2235         )
  2236         .expect("record should normalize");
  2237  
  2238         let row = out.event_rows[0].as_object().unwrap();
  2239         assert_eq!(
  2240             row.get("event_kind").unwrap().as_str().unwrap(),
  2241             "event_msg"
  2242         );
  2243         assert_eq!(
  2244             row.get("payload_type").unwrap().as_str().unwrap(),
  2245             "task_started"
  2246         );
  2247     }
  2248  
  2249     #[test]
  2250     fn claude_progress_unknown_payload_type_moves_to_unknown_and_preserves_op_kind() {
  2251         let record = json!({
  2252             "timestamp": "2026-02-15T03:50:50.838Z",
  2253             "type": "progress",
  2254             "sessionId": "7c666c01-d38e-4658-8650-854ffb5b626e",
  2255             "data": {
  2256                 "type": "provider_extension_step"
  2257             },
  2258             "status": "ok"
  2259         });
  2260  
  2261         let out = normalize_record(
  2262             &record,
  2263             "claude",
  2264             "claude",
  2265             "/Users/eric/.claude/projects/p1/s1.jsonl",
  2266             1,
  2267             1,
  2268             6,
  2269             6,
  2270             "",
  2271             "",
  2272         )
  2273         .expect("record should normalize");
  2274  
  2275         let row = out.event_rows[0].as_object().unwrap();
  2276         assert_eq!(row.get("event_kind").unwrap().as_str().unwrap(), "progress");
  2277         assert_eq!(
  2278             row.get("payload_type").unwrap().as_str().unwrap(),
  2279             "unknown"
  2280         );
  2281         assert_eq!(
  2282             row.get("op_kind").unwrap().as_str().unwrap(),
  2283             "provider_extension_step"
  2284         );
  2285     }
  2286  
  2287     #[test]
  2288     fn link_type_is_canonicalized_to_domain() {
  2289         let ctx = RecordContext {
  2290             source_name: "codex",
  2291             provider: "codex",
  2292             session_id: "s1",
  2293             session_date: "2026-02-15",
  2294             source_file: "/tmp/s1.jsonl",
  2295             source_inode: 1,
  2296             source_generation: 1,
  2297             source_line_no: 1,
  2298             source_offset: 1,
  2299             record_ts: "2026-02-15T03:50:50.838Z",
  2300             event_ts: "2026-02-15 03:50:50.838",
  2301         };
  2302  
  2303         let link = build_link_row(&ctx, "e1", "e2", "", "new_link_type", "{}");
  2304         let link_obj = link.as_object().unwrap();
  2305         assert_eq!(
  2306             link_obj.get("link_type").unwrap().as_str().unwrap(),
  2307             "unknown"
  2308         );
  2309     }
  2310 }