Skip to content

rust/ingestor/src/normalize.rs


     1 use crate::model::NormalizedRecord;
     2 use chrono::{DateTime, Utc};
     3 use regex::Regex;
     4 use serde_json::{json, Map, Value};
     5 use sha2::{Digest, Sha256};
     6 use std::sync::OnceLock;
     7 use std::time::{SystemTime, UNIX_EPOCH};
     8  
     9 const TEXT_LIMIT: usize = 200_000;
    10 const PREVIEW_LIMIT: usize = 320;
    11  
    12 fn session_id_re() -> &'static Regex {
    13     static SESSION_ID_RE: OnceLock<Regex> = OnceLock::new();
    14     SESSION_ID_RE.get_or_init(|| {
    15         Regex::new(
    16             r"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})$",
    17         )
    18         .expect("valid session id regex")
    19     })
    20 }
    21  
    22 fn session_date_re() -> &'static Regex {
    23     static SESSION_DATE_RE: OnceLock<Regex> = OnceLock::new();
    24     SESSION_DATE_RE.get_or_init(|| {
    25         Regex::new(r"/(?:sessions|projects)/(\d{4})/(\d{2})/(\d{2})/")
    26             .expect("valid session date regex")
    27     })
    28 }
    29  
    30 fn to_str(value: Option<&Value>) -> String {
    31     match value {
    32         None | Some(Value::Null) => String::new(),
    33         Some(Value::String(s)) => s.clone(),
    34         Some(other) => other.to_string(),
    35     }
    36 }
    37  
    38 fn to_u32(value: Option<&Value>) -> u32 {
    39     match value {
    40         Some(Value::Number(n)) => n.as_u64().unwrap_or(0).min(u32::MAX as u64) as u32,
    41         Some(Value::String(s)) => s.parse::<u64>().unwrap_or(0).min(u32::MAX as u64) as u32,
    42         _ => 0,
    43     }
    44 }
    45  
    46 fn to_u16(value: Option<&Value>) -> u16 {
    47     to_u32(value).min(u16::MAX as u32) as u16
    48 }
    49  
    50 fn to_u8_bool(value: Option<&Value>) -> u8 {
    51     match value {
    52         Some(Value::Bool(v)) => {
    53             if *v {
    54                 1
    55             } else {
    56                 0
    57             }
    58         }
    59         Some(Value::Number(v)) => {
    60             if v.as_i64().unwrap_or(0) != 0 {
    61                 1
    62             } else {
    63                 0
    64             }
    65         }
    66         Some(Value::String(s)) => {
    67             let lower = s.to_ascii_lowercase();
    68             if lower == "true" || lower == "1" {
    69                 1
    70             } else {
    71                 0
    72             }
    73         }
    74         _ => 0,
    75     }
    76 }
    77  
    78 fn canonicalize_model(provider: &str, raw_model: &str) -> String {
    79     let mut model = raw_model.trim().to_ascii_lowercase();
    80     if model.is_empty() {
    81         return String::new();
    82     }
    83  
    84     model = model.replace(' ', "-");
    85  
    86     if provider == "codex" && model == "codex" {
    87         return "gpt-5.3-codex-xhigh".to_string();
    88     }
    89  
    90     model
    91 }
    92  
    93 fn resolve_model_hint(event_rows: &[Value], provider: &str, fallback: &str) -> String {
    94     for row in event_rows.iter().rev() {
    95         if let Some(model) = row.get("model").and_then(Value::as_str) {
    96             let normalized = canonicalize_model(provider, model);
    97             if !normalized.is_empty() {
    98                 return normalized;
    99             }
   100         }
   101     }
   102  
   103     canonicalize_model(provider, fallback)
   104 }
   105  
   106 fn compact_json(value: &Value) -> String {
   107     serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string())
   108 }
   109  
   110 fn truncate_chars(input: &str, max_chars: usize) -> String {
   111     if input.chars().count() <= max_chars {
   112         input.to_string()
   113     } else {
   114         input.chars().take(max_chars).collect()
   115     }
   116 }
   117  
   118 fn extract_message_text(content: &Value) -> String {
   119     fn walk(node: &Value, out: &mut Vec<String>) {
   120         match node {
   121             Value::String(s) => {
   122                 if !s.trim().is_empty() {
   123                     out.push(s.clone());
   124                 }
   125             }
   126             Value::Array(items) => {
   127                 for item in items {
   128                     walk(item, out);
   129                 }
   130             }
   131             Value::Object(map) => {
   132                 for key in ["text", "message", "output", "thinking", "summary"] {
   133                     if let Some(Value::String(s)) = map.get(key) {
   134                         if !s.trim().is_empty() {
   135                             out.push(s.clone());
   136                         }
   137                     }
   138                 }
   139  
   140                 for key in ["content", "text_elements", "input"] {
   141                     if let Some(value) = map.get(key) {
   142                         walk(value, out);
   143                     }
   144                 }
   145             }
   146             _ => {}
   147         }
   148     }
   149  
   150     let mut chunks = Vec::<String>::new();
   151     walk(content, &mut chunks);
   152     truncate_chars(&chunks.join("\n"), TEXT_LIMIT)
   153 }
   154  
   155 fn extract_content_types(content: &Value) -> Vec<String> {
   156     if let Value::Array(items) = content {
   157         let mut out = Vec::<String>::new();
   158         for item in items {
   159             if let Some(t) = item.get("type").and_then(|v| v.as_str()) {
   160                 if !t.is_empty() {
   161                     out.push(t.to_string());
   162                 }
   163             }
   164         }
   165         out.sort();
   166         out.dedup();
   167         return out;
   168     }
   169     Vec::new()
   170 }
   171  
   172 pub fn infer_session_id_from_file(source_file: &str) -> String {
   173     let stem = std::path::Path::new(source_file)
   174         .file_stem()
   175         .and_then(|s| s.to_str())
   176         .unwrap_or_default();
   177  
   178     session_id_re()
   179         .captures(stem)
   180         .and_then(|cap| cap.get(1).map(|m| m.as_str().to_string()))
   181         .unwrap_or_default()
   182 }
   183  
   184 pub fn infer_session_date_from_file(source_file: &str, record_ts: &str) -> String {
   185     if let Some(cap) = session_date_re().captures(source_file) {
   186         return format!("{}-{}-{}", &cap[1], &cap[2], &cap[3]);
   187     }
   188  
   189     parse_event_ts(record_ts)
   190         .split(' ')
   191         .next()
   192         .unwrap_or("1970-01-01")
   193         .to_string()
   194 }
   195  
   196 fn parse_event_ts(record_ts: &str) -> String {
   197     if let Ok(dt) = DateTime::parse_from_rfc3339(record_ts) {
   198         return dt
   199             .with_timezone(&Utc)
   200             .format("%Y-%m-%d %H:%M:%S%.3f")
   201             .to_string();
   202     }
   203  
   204     Utc::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string()
   205 }
   206  
   207 fn event_uid(
   208     source_file: &str,
   209     source_generation: u32,
   210     source_line_no: u64,
   211     source_offset: u64,
   212     record_fingerprint: &str,
   213     suffix: &str,
   214 ) -> String {
   215     let material = format!(
   216         "{}|{}|{}|{}|{}|{}",
   217         source_file, source_generation, source_line_no, source_offset, record_fingerprint, suffix
   218     );
   219  
   220     let mut hasher = Sha256::new();
   221     hasher.update(material.as_bytes());
   222     format!("{:x}", hasher.finalize())
   223 }
   224  
   225 fn event_version() -> u64 {
   226     let now = SystemTime::now()
   227         .duration_since(UNIX_EPOCH)
   228         .unwrap_or_default()
   229         .as_millis();
   230     now as u64
   231 }
   232  
   233 fn raw_hash(raw_json: &str) -> u64 {
   234     let mut hasher = Sha256::new();
   235     hasher.update(raw_json.as_bytes());
   236     let digest = hasher.finalize();
   237     let hex = format!("{:x}", digest);
   238     u64::from_str_radix(&hex[..16], 16).unwrap_or(0)
   239 }
   240  
   241 fn io_hash(input_json: &str, output_json: &str) -> u64 {
   242     raw_hash(&format!("{}\n{}", input_json, output_json))
   243 }
   244  
   245 struct RecordContext<'a> {
   246     source_name: &'a str,
   247     provider: &'a str,
   248     session_id: &'a str,
   249     session_date: &'a str,
   250     source_file: &'a str,
   251     source_inode: u64,
   252     source_generation: u32,
   253     source_line_no: u64,
   254     source_offset: u64,
   255     record_ts: &'a str,
   256     event_ts: &'a str,
   257 }
   258  
   259 fn base_event_obj(
   260     ctx: &RecordContext<'_>,
   261     event_uid: &str,
   262     event_kind: &str,
   263     payload_type: &str,
   264     actor_kind: &str,
   265     text_content: &str,
   266     payload_json: &str,
   267 ) -> Map<String, Value> {
   268     let text_content = truncate_chars(text_content, TEXT_LIMIT);
   269     let mut obj = Map::<String, Value>::new();
   270     obj.insert(
   271         "event_uid".to_string(),
   272         Value::String(event_uid.to_string()),
   273     );
   274     obj.insert(
   275         "session_id".to_string(),
   276         Value::String(ctx.session_id.to_string()),
   277     );
   278     obj.insert(
   279         "session_date".to_string(),
   280         Value::String(ctx.session_date.to_string()),
   281     );
   282     obj.insert(
   283         "source_name".to_string(),
   284         Value::String(ctx.source_name.to_string()),
   285     );
   286     obj.insert(
   287         "provider".to_string(),
   288         Value::String(ctx.provider.to_string()),
   289     );
   290     obj.insert(
   291         "source_file".to_string(),
   292         Value::String(ctx.source_file.to_string()),
   293     );
   294     obj.insert("source_inode".to_string(), json!(ctx.source_inode));
   295     obj.insert(
   296         "source_generation".to_string(),
   297         json!(ctx.source_generation),
   298     );
   299     obj.insert("source_line_no".to_string(), json!(ctx.source_line_no));
   300     obj.insert("source_offset".to_string(), json!(ctx.source_offset));
   301     obj.insert(
   302         "source_ref".to_string(),
   303         Value::String(format!(
   304             "{}:{}:{}",
   305             ctx.source_file, ctx.source_generation, ctx.source_line_no
   306         )),
   307     );
   308     obj.insert(
   309         "record_ts".to_string(),
   310         Value::String(ctx.record_ts.to_string()),
   311     );
   312     obj.insert(
   313         "event_ts".to_string(),
   314         Value::String(ctx.event_ts.to_string()),
   315     );
   316     obj.insert(
   317         "event_kind".to_string(),
   318         Value::String(event_kind.to_string()),
   319     );
   320     obj.insert(
   321         "actor_kind".to_string(),
   322         Value::String(actor_kind.to_string()),
   323     );
   324     obj.insert(
   325         "payload_type".to_string(),
   326         Value::String(payload_type.to_string()),
   327     );
   328     obj.insert("op_kind".to_string(), Value::String(String::new()));
   329     obj.insert("op_status".to_string(), Value::String(String::new()));
   330     obj.insert("request_id".to_string(), Value::String(String::new()));
   331     obj.insert("trace_id".to_string(), Value::String(String::new()));
   332     obj.insert("turn_index".to_string(), json!(0u32));
   333     obj.insert("item_id".to_string(), Value::String(String::new()));
   334     obj.insert("tool_call_id".to_string(), Value::String(String::new()));
   335     obj.insert(
   336         "parent_tool_call_id".to_string(),
   337         Value::String(String::new()),
   338     );
   339     obj.insert("origin_event_id".to_string(), Value::String(String::new()));
   340     obj.insert(
   341         "origin_tool_call_id".to_string(),
   342         Value::String(String::new()),
   343     );
   344     obj.insert("tool_name".to_string(), Value::String(String::new()));
   345     obj.insert("tool_phase".to_string(), Value::String(String::new()));
   346     obj.insert("tool_error".to_string(), json!(0u8));
   347     obj.insert("agent_run_id".to_string(), Value::String(String::new()));
   348     obj.insert("agent_label".to_string(), Value::String(String::new()));
   349     obj.insert("coord_group_id".to_string(), Value::String(String::new()));
   350     obj.insert(
   351         "coord_group_label".to_string(),
   352         Value::String(String::new()),
   353     );
   354     obj.insert("is_substream".to_string(), json!(0u8));
   355     obj.insert("model".to_string(), Value::String(String::new()));
   356     obj.insert("input_tokens".to_string(), json!(0u32));
   357     obj.insert("output_tokens".to_string(), json!(0u32));
   358     obj.insert("cache_read_tokens".to_string(), json!(0u32));
   359     obj.insert("cache_write_tokens".to_string(), json!(0u32));
   360     obj.insert("latency_ms".to_string(), json!(0u32));
   361     obj.insert("retry_count".to_string(), json!(0u16));
   362     obj.insert("service_tier".to_string(), Value::String(String::new()));
   363     obj.insert("content_types".to_string(), json!([]));
   364     obj.insert("has_reasoning".to_string(), json!(0u8));
   365     obj.insert(
   366         "text_content".to_string(),
   367         Value::String(text_content.clone()),
   368     );
   369     obj.insert(
   370         "text_preview".to_string(),
   371         Value::String(truncate_chars(&text_content, PREVIEW_LIMIT)),
   372     );
   373     obj.insert(
   374         "payload_json".to_string(),
   375         Value::String(payload_json.to_string()),
   376     );
   377     obj.insert("token_usage_json".to_string(), Value::String(String::new()));
   378     obj.insert("event_version".to_string(), json!(event_version()));
   379     obj
   380 }
   381  
   382 fn build_link_row(
   383     ctx: &RecordContext<'_>,
   384     event_uid: &str,
   385     linked_event_uid: &str,
   386     link_type: &str,
   387     metadata_json: &str,
   388 ) -> Value {
   389     json!({
   390         "event_uid": event_uid,
   391         "linked_event_uid": linked_event_uid,
   392         "link_type": link_type,
   393         "session_id": ctx.session_id,
   394         "provider": ctx.provider,
   395         "source_name": ctx.source_name,
   396         "metadata_json": metadata_json,
   397         "event_version": event_version(),
   398     })
   399 }
   400  
   401 fn build_tool_row(
   402     ctx: &RecordContext<'_>,
   403     event_uid: &str,
   404     tool_call_id: &str,
   405     parent_tool_call_id: &str,
   406     tool_name: &str,
   407     tool_phase: &str,
   408     tool_error: u8,
   409     input_json: &str,
   410     output_json: &str,
   411     output_text: &str,
   412 ) -> Value {
   413     let input_json = truncate_chars(input_json, TEXT_LIMIT);
   414     let output_json = truncate_chars(output_json, TEXT_LIMIT);
   415     let output_text = truncate_chars(output_text, TEXT_LIMIT);
   416  
   417     json!({
   418         "event_uid": event_uid,
   419         "session_id": ctx.session_id,
   420         "provider": ctx.provider,
   421         "source_name": ctx.source_name,
   422         "tool_call_id": tool_call_id,
   423         "parent_tool_call_id": parent_tool_call_id,
   424         "tool_name": tool_name,
   425         "tool_phase": tool_phase,
   426         "tool_error": tool_error,
   427         "input_json": input_json,
   428         "output_json": output_json,
   429         "output_text": output_text,
   430         "input_bytes": input_json.len() as u32,
   431         "output_bytes": output_json.len() as u32,
   432         "input_preview": truncate_chars(&input_json, PREVIEW_LIMIT),
   433         "output_preview": truncate_chars(&output_text, PREVIEW_LIMIT),
   434         "io_hash": io_hash(&input_json, &output_json),
   435         "source_ref": format!("{}:{}:{}", ctx.source_file, ctx.source_generation, ctx.source_line_no),
   436         "event_version": event_version(),
   437     })
   438 }
   439  
   440 fn normalize_codex_event(
   441     record: &Value,
   442     ctx: &RecordContext<'_>,
   443     top_type: &str,
   444     base_uid: &str,
   445     model_hint: &str,
   446 ) -> (Vec<Value>, Vec<Value>, Vec<Value>) {
   447     let mut events = Vec::<Value>::new();
   448     let mut links = Vec::<Value>::new();
   449     let mut tools = Vec::<Value>::new();
   450  
   451     let payload = record.get("payload").cloned().unwrap_or(Value::Null);
   452     let payload_obj = payload.as_object().cloned().unwrap_or_else(Map::new);
   453     let payload_json = compact_json(&Value::Object(payload_obj.clone()));
   454  
   455     let push_parent_link = |links: &mut Vec<Value>, uid: &str, parent: &str| {
   456         if !parent.is_empty() {
   457             links.push(build_link_row(ctx, uid, parent, "parent_event", "{}"));
   458         }
   459     };
   460  
   461     match top_type {
   462         "session_meta" => {
   463             let mut row = base_event_obj(
   464                 ctx,
   465                 base_uid,
   466                 "session_meta",
   467                 "session_meta",
   468                 "system",
   469                 "",
   470                 &payload_json,
   471             );
   472             row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
   473             events.push(Value::Object(row));
   474         }
   475         "turn_context" => {
   476             let mut row = base_event_obj(
   477                 ctx,
   478                 base_uid,
   479                 "turn_context",
   480                 "turn_context",
   481                 "system",
   482                 "",
   483                 &payload_json,
   484             );
   485             row.insert(
   486                 "turn_index".to_string(),
   487                 json!(to_u32(payload_obj.get("turn_id"))),
   488             );
   489             let turn_id = to_str(payload_obj.get("turn_id"));
   490             if !turn_id.is_empty() {
   491                 row.insert("request_id".to_string(), json!(turn_id.clone()));
   492                 row.insert("item_id".to_string(), json!(turn_id));
   493             }
   494             let model = canonicalize_model("codex", &to_str(payload_obj.get("model")));
   495             if !model.is_empty() {
   496                 row.insert("model".to_string(), json!(model));
   497             }
   498             events.push(Value::Object(row));
   499         }
   500         "response_item" => {
   501             let payload_type = to_str(payload_obj.get("type"));
   502             match payload_type.as_str() {
   503                 "message" => {
   504                     let role = to_str(payload_obj.get("role"));
   505                     let content = payload_obj.get("content").cloned().unwrap_or(Value::Null);
   506                     let text = extract_message_text(&content);
   507                     let mut row = base_event_obj(
   508                         ctx,
   509                         base_uid,
   510                         "message",
   511                         "message",
   512                         if role.is_empty() {
   513                             "assistant"
   514                         } else {
   515                             role.as_str()
   516                         },
   517                         &text,
   518                         &payload_json,
   519                     );
   520                     row.insert(
   521                         "content_types".to_string(),
   522                         json!(extract_content_types(&content)),
   523                     );
   524                     row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
   525                     row.insert(
   526                         "op_status".to_string(),
   527                         json!(to_str(payload_obj.get("phase"))),
   528                     );
   529                     events.push(Value::Object(row));
   530                 }
   531                 "function_call" => {
   532                     let args = to_str(payload_obj.get("arguments"));
   533                     let call_id = to_str(payload_obj.get("call_id"));
   534                     let name = to_str(payload_obj.get("name"));
   535                     let mut row = base_event_obj(
   536                         ctx,
   537                         base_uid,
   538                         "tool_call",
   539                         "function_call",
   540                         "assistant",
   541                         &args,
   542                         &payload_json,
   543                     );
   544                     row.insert("tool_call_id".to_string(), json!(call_id.clone()));
   545                     row.insert("tool_name".to_string(), json!(name.clone()));
   546                     events.push(Value::Object(row));
   547  
   548                     tools.push(build_tool_row(
   549                         ctx, base_uid, &call_id, "", &name, "request", 0, &args, "", "",
   550                     ));
   551                 }
   552                 "function_call_output" => {
   553                     let output = to_str(payload_obj.get("output"));
   554                     let call_id = to_str(payload_obj.get("call_id"));
   555                     let mut row = base_event_obj(
   556                         ctx,
   557                         base_uid,
   558                         "tool_result",
   559                         "function_call_output",
   560                         "tool",
   561                         &output,
   562                         &payload_json,
   563                     );
   564                     row.insert("tool_call_id".to_string(), json!(call_id.clone()));
   565                     events.push(Value::Object(row));
   566  
   567                     tools.push(build_tool_row(
   568                         ctx,
   569                         base_uid,
   570                         &call_id,
   571                         "",
   572                         "",
   573                         "response",
   574                         0,
   575                         "",
   576                         &compact_json(payload_obj.get("output").unwrap_or(&Value::Null)),
   577                         &output,
   578                     ));
   579                 }
   580                 "custom_tool_call" => {
   581                     let input = to_str(payload_obj.get("input"));
   582                     let call_id = to_str(payload_obj.get("call_id"));
   583                     let name = to_str(payload_obj.get("name"));
   584                     let status = to_str(payload_obj.get("status"));
   585                     let mut row = base_event_obj(
   586                         ctx,
   587                         base_uid,
   588                         "tool_call",
   589                         "custom_tool_call",
   590                         "assistant",
   591                         &input,
   592                         &payload_json,
   593                     );
   594                     row.insert("tool_call_id".to_string(), json!(call_id.clone()));
   595                     row.insert("tool_name".to_string(), json!(name.clone()));
   596                     row.insert("op_status".to_string(), json!(status));
   597                     events.push(Value::Object(row));
   598  
   599                     tools.push(build_tool_row(
   600                         ctx, base_uid, &call_id, "", &name, "request", 0, &input, "", "",
   601                     ));
   602                 }
   603                 "custom_tool_call_output" => {
   604                     let output = to_str(payload_obj.get("output"));
   605                     let call_id = to_str(payload_obj.get("call_id"));
   606                     let status = to_str(payload_obj.get("status"));
   607                     let output_json = serde_json::from_str::<Value>(&output)
   608                         .map(|parsed| compact_json(&parsed))
   609                         .unwrap_or_else(|_| {
   610                             compact_json(payload_obj.get("output").unwrap_or(&Value::Null))
   611                         });
   612  
   613                     let mut row = base_event_obj(
   614                         ctx,
   615                         base_uid,
   616                         "tool_result",
   617                         "custom_tool_call_output",
   618                         "tool",
   619                         &output,
   620                         &payload_json,
   621                     );
   622                     row.insert("tool_call_id".to_string(), json!(call_id.clone()));
   623                     row.insert("op_status".to_string(), json!(status));
   624                     events.push(Value::Object(row));
   625  
   626                     tools.push(build_tool_row(
   627                         ctx,
   628                         base_uid,
   629                         &call_id,
   630                         "",
   631                         "",
   632                         "response",
   633                         0,
   634                         "",
   635                         &output_json,
   636                         &output,
   637                     ));
   638                 }
   639                 "web_search_call" => {
   640                     let action = payload_obj.get("action").cloned().unwrap_or(Value::Null);
   641                     let action_type = to_str(action.get("type"));
   642                     let status = to_str(payload_obj.get("status"));
   643                     let mut row = base_event_obj(
   644                         ctx,
   645                         base_uid,
   646                         "tool_call",
   647                         "web_search_call",
   648                         "assistant",
   649                         &extract_message_text(&action),
   650                         &payload_json,
   651                     );
   652                     row.insert("tool_name".to_string(), json!("web_search"));
   653                     row.insert("op_kind".to_string(), json!(action_type));
   654                     row.insert("op_status".to_string(), json!(status.clone()));
   655                     row.insert("tool_phase".to_string(), json!(status));
   656                     events.push(Value::Object(row));
   657                 }
   658                 "reasoning" => {
   659                     let summary = payload_obj.get("summary").cloned().unwrap_or(Value::Null);
   660                     let mut row = base_event_obj(
   661                         ctx,
   662                         base_uid,
   663                         "reasoning",
   664                         "reasoning",
   665                         "assistant",
   666                         &extract_message_text(&summary),
   667                         &payload_json,
   668                     );
   669                     row.insert("has_reasoning".to_string(), json!(1u8));
   670                     row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
   671                     events.push(Value::Object(row));
   672                 }
   673                 _ => {
   674                     events.push(Value::Object(base_event_obj(
   675                         ctx,
   676                         base_uid,
   677                         "unknown",
   678                         if payload_type.is_empty() {
   679                             "response_item"
   680                         } else {
   681                             payload_type.as_str()
   682                         },
   683                         "system",
   684                         &extract_message_text(&payload),
   685                         &payload_json,
   686                     )));
   687                 }
   688             }
   689         }
   690         "event_msg" => {
   691             let payload_type = to_str(payload_obj.get("type"));
   692             let actor = match payload_type.as_str() {
   693                 "user_message" => "user",
   694                 "agent_message" | "agent_reasoning" => "assistant",
   695                 _ => "system",
   696             };
   697             let mut row = base_event_obj(
   698                 ctx,
   699                 base_uid,
   700                 "event_msg",
   701                 if payload_type.is_empty() {
   702                     "event_msg"
   703                 } else {
   704                     payload_type.as_str()
   705                 },
   706                 actor,
   707                 &extract_message_text(&payload),
   708                 &payload_json,
   709             );
   710             let turn_id = to_str(payload_obj.get("turn_id"));
   711             if !turn_id.is_empty() {
   712                 row.insert("request_id".to_string(), json!(turn_id.clone()));
   713                 row.insert("item_id".to_string(), json!(turn_id));
   714             }
   715             let status = to_str(payload_obj.get("status"));
   716             if !status.is_empty() {
   717                 row.insert("op_status".to_string(), json!(status));
   718             }
   719             if payload_type == "token_count" {
   720                 let usage = payload_obj
   721                     .get("info")
   722                     .and_then(|v| v.get("last_token_usage"));
   723                 let input_tokens = to_u32(usage.and_then(|v| v.get("input_tokens")));
   724                 let output_tokens = to_u32(usage.and_then(|v| v.get("output_tokens")));
   725                 let cache_read_tokens = to_u32(
   726                     usage
   727                         .and_then(|v| v.get("cached_input_tokens"))
   728                         .or_else(|| usage.and_then(|v| v.get("cache_read_input_tokens"))),
   729                 );
   730                 let cache_write_tokens = to_u32(
   731                     usage
   732                         .and_then(|v| v.get("cache_creation_input_tokens"))
   733                         .or_else(|| usage.and_then(|v| v.get("cache_write_input_tokens"))),
   734                 );
   735  
   736                 let model = to_str(
   737                     payload_obj
   738                         .get("rate_limits")
   739                         .and_then(|v| v.get("limit_name")),
   740                 );
   741                 let fallback_model = to_str(payload_obj.get("model"));
   742                 let fallback_limit_id = to_str(
   743                     payload_obj
   744                         .get("rate_limits")
   745                         .and_then(|v| v.get("limit_id")),
   746                 );
   747                 let resolved_model = if !model.is_empty() {
   748                     canonicalize_model("codex", &model)
   749                 } else if !fallback_model.is_empty() {
   750                     canonicalize_model("codex", &fallback_model)
   751                 } else if !fallback_limit_id.is_empty() {
   752                     canonicalize_model("codex", &fallback_limit_id)
   753                 } else {
   754                     canonicalize_model("codex", model_hint)
   755                 };
   756  
   757                 row.insert("input_tokens".to_string(), json!(input_tokens));
   758                 row.insert("output_tokens".to_string(), json!(output_tokens));
   759                 row.insert("cache_read_tokens".to_string(), json!(cache_read_tokens));
   760                 row.insert("cache_write_tokens".to_string(), json!(cache_write_tokens));
   761                 if !resolved_model.is_empty() {
   762                     row.insert("model".to_string(), json!(resolved_model));
   763                 }
   764                 row.insert(
   765                     "service_tier".to_string(),
   766                     json!(to_str(payload_obj.get("rate_limits").and_then(|v| v.get("plan_type")))),
   767                 );
   768                 row.insert(
   769                     "token_usage_json".to_string(),
   770                     json!(compact_json(&payload)),
   771                 );
   772             } else if payload_type == "agent_reasoning" {
   773                 row.insert("has_reasoning".to_string(), json!(1u8));
   774             }
   775             events.push(Value::Object(row));
   776         }
   777         "compacted" => {
   778             events.push(Value::Object(base_event_obj(
   779                 ctx,
   780                 base_uid,
   781                 "compacted_raw",
   782                 "compacted",
   783                 "system",
   784                 "",
   785                 &payload_json,
   786             )));
   787  
   788             if let Some(Value::Array(items)) = payload_obj.get("replacement_history") {
   789                 for (idx, item) in items.iter().enumerate() {
   790                     let item_uid = event_uid(
   791                         ctx.source_file,
   792                         ctx.source_generation,
   793                         ctx.source_line_no,
   794                         ctx.source_offset,
   795                         &compact_json(item),
   796                         &format!("compacted:{}", idx),
   797                     );
   798                     let item_type = to_str(item.get("type"));
   799  
   800                     let (kind, payload_type, actor, text) = match item_type.as_str() {
   801                         "message" => (
   802                             "message",
   803                             "message",
   804                             to_str(item.get("role")),
   805                             extract_message_text(item.get("content").unwrap_or(&Value::Null)),
   806                         ),
   807                         "function_call" => (
   808                             "tool_call",
   809                             "function_call",
   810                             "assistant".to_string(),
   811                             to_str(item.get("arguments")),
   812                         ),
   813                         "function_call_output" => (
   814                             "tool_result",
   815                             "function_call_output",
   816                             "tool".to_string(),
   817                             to_str(item.get("output")),
   818                         ),
   819                         "reasoning" => (
   820                             "reasoning",
   821                             "reasoning",
   822                             "assistant".to_string(),
   823                             extract_message_text(item.get("summary").unwrap_or(&Value::Null)),
   824                         ),
   825                         _ => (
   826                             "unknown",
   827                             if item_type.is_empty() {
   828                                 "unknown"
   829                             } else {
   830                                 item_type.as_str()
   831                             },
   832                             "system".to_string(),
   833                             extract_message_text(item),
   834                         ),
   835                     };
   836  
   837                     let mut row = base_event_obj(
   838                         ctx,
   839                         &item_uid,
   840                         kind,
   841                         payload_type,
   842                         if actor.is_empty() {
   843                             "assistant"
   844                         } else {
   845                             actor.as_str()
   846                         },
   847                         &text,
   848                         &compact_json(item),
   849                     );
   850                     row.insert("origin_event_id".to_string(), json!(base_uid));
   851                     events.push(Value::Object(row));
   852  
   853                     links.push(build_link_row(
   854                         ctx,
   855                         &item_uid,
   856                         base_uid,
   857                         "compacted_parent",
   858                         "{}",
   859                     ));
   860                 }
   861             }
   862         }
   863         "message" | "function_call" | "function_call_output" | "reasoning" => {
   864             let event = if top_type == "message" {
   865                 let role = to_str(record.get("role"));
   866                 let text = extract_message_text(record.get("content").unwrap_or(&Value::Null));
   867                 let mut row = base_event_obj(
   868                     ctx,
   869                     base_uid,
   870                     "message",
   871                     "message",
   872                     if role.is_empty() {
   873                         "assistant"
   874                     } else {
   875                         role.as_str()
   876                     },
   877                     &text,
   878                     &compact_json(record),
   879                 );
   880                 row.insert(
   881                     "content_types".to_string(),
   882                     json!(extract_content_types(
   883                         record.get("content").unwrap_or(&Value::Null)
   884                     )),
   885                 );
   886                 Value::Object(row)
   887             } else if top_type == "function_call" {
   888                 let args = to_str(record.get("arguments"));
   889                 let call_id = to_str(record.get("call_id"));
   890                 let name = to_str(record.get("name"));
   891                 let mut row = base_event_obj(
   892                     ctx,
   893                     base_uid,
   894                     "tool_call",
   895                     "function_call",
   896                     "assistant",
   897                     &args,
   898                     &compact_json(record),
   899                 );
   900                 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
   901                 row.insert("tool_name".to_string(), json!(name.clone()));
   902                 tools.push(build_tool_row(
   903                     ctx, base_uid, &call_id, "", &name, "request", 0, &args, "", "",
   904                 ));
   905                 Value::Object(row)
   906             } else if top_type == "function_call_output" {
   907                 let output = to_str(record.get("output"));
   908                 let call_id = to_str(record.get("call_id"));
   909                 let mut row = base_event_obj(
   910                     ctx,
   911                     base_uid,
   912                     "tool_result",
   913                     "function_call_output",
   914                     "tool",
   915                     &output,
   916                     &compact_json(record),
   917                 );
   918                 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
   919                 tools.push(build_tool_row(
   920                     ctx,
   921                     base_uid,
   922                     &call_id,
   923                     "",
   924                     "",
   925                     "response",
   926                     0,
   927                     "",
   928                     &compact_json(record.get("output").unwrap_or(&Value::Null)),
   929                     &output,
   930                 ));
   931                 Value::Object(row)
   932             } else {
   933                 let summary = record.get("summary").cloned().unwrap_or(Value::Null);
   934                 let mut row = base_event_obj(
   935                     ctx,
   936                     base_uid,
   937                     "reasoning",
   938                     "reasoning",
   939                     "assistant",
   940                     &extract_message_text(&summary),
   941                     &compact_json(record),
   942                 );
   943                 row.insert("has_reasoning".to_string(), json!(1u8));
   944                 Value::Object(row)
   945             };
   946  
   947             events.push(event);
   948         }
   949         _ => {
   950             events.push(Value::Object(base_event_obj(
   951                 ctx,
   952                 base_uid,
   953                 "unknown",
   954                 if top_type.is_empty() {
   955                     "unknown"
   956                 } else {
   957                     top_type
   958                 },
   959                 "system",
   960                 &extract_message_text(record),
   961                 &compact_json(record),
   962             )));
   963         }
   964     }
   965  
   966     let payload_model = canonicalize_model("codex", &to_str(payload_obj.get("model")));
   967     let inherited_model = canonicalize_model("codex", model_hint);
   968     for event in &mut events {
   969         if let Some(row) = event.as_object_mut() {
   970             let row_model = canonicalize_model("codex", &to_str(row.get("model")));
   971             let resolved_model = if !row_model.is_empty() {
   972                 row_model
   973             } else if !payload_model.is_empty() {
   974                 payload_model.clone()
   975             } else {
   976                 inherited_model.clone()
   977             };
   978  
   979             if !resolved_model.is_empty() {
   980                 row.insert("model".to_string(), json!(resolved_model));
   981             }
   982         }
   983     }
   984  
   985     let parent = to_str(record.get("parent_id"));
   986     if !events.is_empty() && !parent.is_empty() {
   987         if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
   988             push_parent_link(&mut links, uid, &parent);
   989         }
   990     }
   991  
   992     (events, links, tools)
   993 }
   994  
   995 fn normalize_claude_event(
   996     record: &Value,
   997     ctx: &RecordContext<'_>,
   998     top_type: &str,
   999     base_uid: &str,
  1000 ) -> (Vec<Value>, Vec<Value>, Vec<Value>) {
  1001     let mut events = Vec::<Value>::new();
  1002     let mut links = Vec::<Value>::new();
  1003     let mut tools = Vec::<Value>::new();
  1004  
  1005     let parent_uuid = to_str(record.get("parentUuid"));
  1006     let request_id = to_str(record.get("requestId"));
  1007     let trace_id = to_str(record.get("requestId"));
  1008     let agent_run_id = to_str(record.get("agentId"));
  1009     let agent_label = to_str(record.get("agentName"));
  1010     let coord_group_label = to_str(record.get("teamName"));
  1011     let is_substream = to_u8_bool(record.get("isSidechain"));
  1012  
  1013     let message = record.get("message").cloned().unwrap_or(Value::Null);
  1014     let msg_role = to_str(message.get("role"));
  1015     let model = canonicalize_model("claude", &to_str(message.get("model")));
  1016  
  1017     let usage = message.get("usage").cloned().unwrap_or(Value::Null);
  1018     let input_tokens = to_u32(usage.get("input_tokens"));
  1019     let output_tokens = to_u32(usage.get("output_tokens"));
  1020     let cache_read_tokens = to_u32(usage.get("cache_read_input_tokens"));
  1021     let cache_write_tokens = to_u32(usage.get("cache_creation_input_tokens"));
  1022     let service_tier = to_str(usage.get("service_tier"));
  1023  
  1024     let stamp_common = |obj: &mut Map<String, Value>| {
  1025         obj.insert("request_id".to_string(), json!(request_id.clone()));
  1026         obj.insert("trace_id".to_string(), json!(trace_id.clone()));
  1027         obj.insert("agent_run_id".to_string(), json!(agent_run_id.clone()));
  1028         obj.insert("agent_label".to_string(), json!(agent_label.clone()));
  1029         obj.insert(
  1030             "coord_group_label".to_string(),
  1031             json!(coord_group_label.clone()),
  1032         );
  1033         obj.insert("is_substream".to_string(), json!(is_substream));
  1034         obj.insert("model".to_string(), json!(model.clone()));
  1035         obj.insert("input_tokens".to_string(), json!(input_tokens));
  1036         obj.insert("output_tokens".to_string(), json!(output_tokens));
  1037         obj.insert("cache_read_tokens".to_string(), json!(cache_read_tokens));
  1038         obj.insert("cache_write_tokens".to_string(), json!(cache_write_tokens));
  1039         obj.insert("service_tier".to_string(), json!(service_tier.clone()));
  1040         obj.insert("item_id".to_string(), json!(to_str(record.get("uuid"))));
  1041         obj.insert(
  1042             "origin_event_id".to_string(),
  1043             json!(to_str(record.get("sourceToolAssistantUUID"))),
  1044         );
  1045         obj.insert(
  1046             "origin_tool_call_id".to_string(),
  1047             json!(to_str(record.get("sourceToolUseID"))),
  1048         );
  1049     };
  1050  
  1051     if top_type == "assistant" || top_type == "user" {
  1052         let actor = if top_type == "assistant" {
  1053             "assistant"
  1054         } else if msg_role == "assistant" {
  1055             "assistant"
  1056         } else {
  1057             "user"
  1058         };
  1059  
  1060         let content = message.get("content").cloned().unwrap_or_else(|| {
  1061             record
  1062                 .get("message")
  1063                 .and_then(|m| m.get("content"))
  1064                 .cloned()
  1065                 .unwrap_or(Value::Null)
  1066         });
  1067  
  1068         match content {
  1069             Value::Array(items) if !items.is_empty() => {
  1070                 for (idx, item) in items.iter().enumerate() {
  1071                     let block_type = to_str(item.get("type"));
  1072                     let suffix = format!("claude:block:{}", idx);
  1073                     let block_uid = event_uid(
  1074                         ctx.source_file,
  1075                         ctx.source_generation,
  1076                         ctx.source_line_no,
  1077                         ctx.source_offset,
  1078                         &compact_json(item),
  1079                         &suffix,
  1080                     );
  1081  
  1082                     let mut row = match block_type.as_str() {
  1083                         "thinking" => {
  1084                             let mut r = base_event_obj(
  1085                                 ctx,
  1086                                 &block_uid,
  1087                                 "reasoning",
  1088                                 "thinking",
  1089                                 "assistant",
  1090                                 &extract_message_text(item),
  1091                                 &compact_json(item),
  1092                             );
  1093                             r.insert("has_reasoning".to_string(), json!(1u8));
  1094                             r.insert("content_types".to_string(), json!(["thinking"]));
  1095                             r
  1096                         }
  1097                         "tool_use" => {
  1098                             let tool_call_id = to_str(item.get("id"));
  1099                             let tool_name = to_str(item.get("name"));
  1100                             let input_json =
  1101                                 compact_json(item.get("input").unwrap_or(&Value::Null));
  1102                             let mut r = base_event_obj(
  1103                                 ctx,
  1104                                 &block_uid,
  1105                                 "tool_call",
  1106                                 "tool_use",
  1107                                 "assistant",
  1108                                 &extract_message_text(item.get("input").unwrap_or(&Value::Null)),
  1109                                 &compact_json(item),
  1110                             );
  1111                             r.insert("content_types".to_string(), json!(["tool_use"]));
  1112                             r.insert("tool_call_id".to_string(), json!(tool_call_id.clone()));
  1113                             r.insert("tool_name".to_string(), json!(tool_name.clone()));
  1114                             tools.push(build_tool_row(
  1115                                 ctx,
  1116                                 &block_uid,
  1117                                 &tool_call_id,
  1118                                 &to_str(record.get("parentToolUseID")),
  1119                                 &tool_name,
  1120                                 "request",
  1121                                 0,
  1122                                 &input_json,
  1123                                 "",
  1124                                 "",
  1125                             ));
  1126                             r
  1127                         }
  1128                         "tool_result" => {
  1129                             let tool_call_id = to_str(item.get("tool_use_id"));
  1130                             let output_json =
  1131                                 compact_json(item.get("content").unwrap_or(&Value::Null));
  1132                             let output_text =
  1133                                 extract_message_text(item.get("content").unwrap_or(&Value::Null));
  1134                             let tool_error = to_u8_bool(item.get("is_error"));
  1135                             let mut r = base_event_obj(
  1136                                 ctx,
  1137                                 &block_uid,
  1138                                 "tool_result",
  1139                                 "tool_result",
  1140                                 "tool",
  1141                                 &output_text,
  1142                                 &compact_json(item),
  1143                             );
  1144                             r.insert("content_types".to_string(), json!(["tool_result"]));
  1145                             r.insert("tool_call_id".to_string(), json!(tool_call_id.clone()));
  1146                             r.insert("tool_error".to_string(), json!(tool_error));
  1147                             tools.push(build_tool_row(
  1148                                 ctx,
  1149                                 &block_uid,
  1150                                 &tool_call_id,
  1151                                 &to_str(record.get("parentToolUseID")),
  1152                                 "",
  1153                                 "response",
  1154                                 tool_error,
  1155                                 "",
  1156                                 &output_json,
  1157                                 &output_text,
  1158                             ));
  1159                             r
  1160                         }
  1161                         _ => {
  1162                             let mut r = base_event_obj(
  1163                                 ctx,
  1164                                 &block_uid,
  1165                                 "message",
  1166                                 if block_type.is_empty() {
  1167                                     "text"
  1168                                 } else {
  1169                                     block_type.as_str()
  1170                                 },
  1171                                 actor,
  1172                                 &extract_message_text(item),
  1173                                 &compact_json(item),
  1174                             );
  1175                             if !block_type.is_empty() {
  1176                                 r.insert("content_types".to_string(), json!([block_type]));
  1177                             }
  1178                             r
  1179                         }
  1180                     };
  1181  
  1182                     stamp_common(&mut row);
  1183                     row.insert(
  1184                         "parent_tool_call_id".to_string(),
  1185                         json!(to_str(record.get("parentToolUseID"))),
  1186                     );
  1187                     row.insert(
  1188                         "origin_tool_call_id".to_string(),
  1189                         json!(to_str(record.get("sourceToolUseID"))),
  1190                     );
  1191                     row.insert(
  1192                         "tool_phase".to_string(),
  1193                         json!(to_str(record.get("stop_reason"))),
  1194                     );
  1195                     events.push(Value::Object(row));
  1196  
  1197                     if !parent_uuid.is_empty() {
  1198                         links.push(build_link_row(
  1199                             ctx,
  1200                             &block_uid,
  1201                             &parent_uuid,
  1202                             "parent_uuid",
  1203                             "{}",
  1204                         ));
  1205                     }
  1206                 }
  1207             }
  1208             _ => {
  1209                 let text = extract_message_text(&message);
  1210                 let mut row = base_event_obj(
  1211                     ctx,
  1212                     base_uid,
  1213                     "message",
  1214                     "message",
  1215                     actor,
  1216                     &text,
  1217                     &compact_json(record),
  1218                 );
  1219                 row.insert(
  1220                     "content_types".to_string(),
  1221                     json!(extract_content_types(
  1222                         message.get("content").unwrap_or(&Value::Null)
  1223                     )),
  1224                 );
  1225                 stamp_common(&mut row);
  1226                 events.push(Value::Object(row));
  1227                 if !parent_uuid.is_empty() {
  1228                     links.push(build_link_row(
  1229                         ctx,
  1230                         base_uid,
  1231                         &parent_uuid,
  1232                         "parent_uuid",
  1233                         "{}",
  1234                     ));
  1235                 }
  1236             }
  1237         }
  1238     } else {
  1239         let event_kind = match top_type {
  1240             "progress" => "progress",
  1241             "system" => "system",
  1242             "summary" => "summary",
  1243             "queue-operation" => "queue_operation",
  1244             "file-history-snapshot" => "file_history_snapshot",
  1245             _ => "unknown",
  1246         };
  1247  
  1248         let payload_type = if top_type == "progress" {
  1249             to_str(record.get("data").and_then(|d| d.get("type")))
  1250         } else if top_type == "system" {
  1251             to_str(record.get("subtype"))
  1252         } else {
  1253             top_type.to_string()
  1254         };
  1255  
  1256         let mut row = base_event_obj(
  1257             ctx,
  1258             base_uid,
  1259             event_kind,
  1260             if payload_type.is_empty() {
  1261                 top_type
  1262             } else {
  1263                 payload_type.as_str()
  1264             },
  1265             "system",
  1266             &extract_message_text(record),
  1267             &compact_json(record),
  1268         );
  1269         row.insert("op_kind".to_string(), json!(payload_type));
  1270         row.insert("op_status".to_string(), json!(to_str(record.get("status"))));
  1271         row.insert(
  1272             "latency_ms".to_string(),
  1273             json!(to_u32(record.get("durationMs"))),
  1274         );
  1275         row.insert(
  1276             "retry_count".to_string(),
  1277             json!(to_u16(record.get("retryAttempt"))),
  1278         );
  1279         stamp_common(&mut row);
  1280         events.push(Value::Object(row));
  1281  
  1282         if !parent_uuid.is_empty() {
  1283             links.push(build_link_row(
  1284                 ctx,
  1285                 base_uid,
  1286                 &parent_uuid,
  1287                 "parent_uuid",
  1288                 "{}",
  1289             ));
  1290         }
  1291     }
  1292  
  1293     if !events.is_empty() {
  1294         let tool_use_id = to_str(record.get("toolUseID"));
  1295         if !tool_use_id.is_empty() {
  1296             if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
  1297                 links.push(build_link_row(ctx, uid, &tool_use_id, "tool_use_id", "{}"));
  1298             }
  1299         }
  1300  
  1301         let source_tool_assistant = to_str(record.get("sourceToolAssistantUUID"));
  1302         if !source_tool_assistant.is_empty() {
  1303             if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
  1304                 links.push(build_link_row(
  1305                     ctx,
  1306                     uid,
  1307                     &source_tool_assistant,
  1308                     "source_tool_assistant",
  1309                     "{}",
  1310                 ));
  1311             }
  1312         }
  1313     }
  1314  
  1315     (events, links, tools)
  1316 }
  1317  
  1318 pub fn normalize_record(
  1319     record: &Value,
  1320     source_name: &str,
  1321     provider: &str,
  1322     source_file: &str,
  1323     source_inode: u64,
  1324     source_generation: u32,
  1325     source_line_no: u64,
  1326     source_offset: u64,
  1327     session_hint: &str,
  1328     model_hint: &str,
  1329 ) -> NormalizedRecord {
  1330     let record_ts = to_str(record.get("timestamp"));
  1331     let event_ts = parse_event_ts(&record_ts);
  1332     let top_type = to_str(record.get("type"));
  1333  
  1334     let mut session_id = if provider == "claude" {
  1335         to_str(record.get("sessionId"))
  1336     } else {
  1337         String::new()
  1338     };
  1339     if session_id.is_empty() {
  1340         session_id = if session_hint.is_empty() {
  1341             infer_session_id_from_file(source_file)
  1342         } else {
  1343             session_hint.to_string()
  1344         };
  1345     }
  1346  
  1347     if provider == "codex" && top_type == "session_meta" {
  1348         let payload = record.get("payload").cloned().unwrap_or(Value::Null);
  1349         let payload_id = to_str(payload.get("id"));
  1350         if !payload_id.is_empty() {
  1351             session_id = payload_id;
  1352         }
  1353     }
  1354  
  1355     let session_date = infer_session_date_from_file(source_file, &record_ts);
  1356  
  1357     let raw_json = compact_json(record);
  1358     let base_uid = event_uid(
  1359         source_file,
  1360         source_generation,
  1361         source_line_no,
  1362         source_offset,
  1363         &raw_json,
  1364         "raw",
  1365     );
  1366  
  1367     let raw_row = json!({
  1368         "source_name": source_name,
  1369         "provider": provider,
  1370         "source_file": source_file,
  1371         "source_inode": source_inode,
  1372         "source_generation": source_generation,
  1373         "source_line_no": source_line_no,
  1374         "source_offset": source_offset,
  1375         "record_ts": record_ts,
  1376         "top_type": top_type,
  1377         "session_id": session_id,
  1378         "raw_json": raw_json,
  1379         "raw_json_hash": raw_hash(&raw_json),
  1380         "event_uid": base_uid,
  1381     });
  1382  
  1383     let ctx = RecordContext {
  1384         source_name,
  1385         provider,
  1386         session_id: &session_id,
  1387         session_date: &session_date,
  1388         source_file,
  1389         source_inode,
  1390         source_generation,
  1391         source_line_no,
  1392         source_offset,
  1393         record_ts: &record_ts,
  1394         event_ts: &event_ts,
  1395     };
  1396  
  1397     let (event_rows, link_rows, tool_rows) = if provider == "claude" {
  1398         normalize_claude_event(record, &ctx, &top_type, &base_uid)
  1399     } else {
  1400         normalize_codex_event(record, &ctx, &top_type, &base_uid, model_hint)
  1401     };
  1402     let model_hint = resolve_model_hint(&event_rows, provider, model_hint);
  1403  
  1404     NormalizedRecord {
  1405         raw_row,
  1406         event_rows,
  1407         link_rows,
  1408         tool_rows,
  1409         session_hint: session_id,
  1410         model_hint,
  1411     }
  1412 }
  1413  
  1414 #[cfg(test)]
  1415 mod tests {
  1416     use super::normalize_record;
  1417     use serde_json::json;
  1418  
  1419     #[test]
  1420     fn codex_tool_call_normalization() {
  1421         let record = json!({
  1422             "timestamp": "2026-02-14T02:28:00.000Z",
  1423             "type": "response_item",
  1424             "payload": {
  1425                 "type": "function_call",
  1426                 "call_id": "call_123",
  1427                 "name": "Read",
  1428                 "arguments": "{\"path\":\"README.md\"}"
  1429             }
  1430         });
  1431  
  1432         let out = normalize_record(
  1433             &record,
  1434             "codex",
  1435             "codex",
  1436             "/Users/eric/.codex/sessions/2026/02/13/session-019c59f9-6389-77a1-a0cb-304eecf935b6.jsonl",
  1437             123,
  1438             1,
  1439             42,
  1440             1024,
  1441             "",
  1442             "",
  1443         );
  1444  
  1445         assert_eq!(out.event_rows.len(), 1);
  1446         assert_eq!(out.tool_rows.len(), 1);
  1447         let row = out.event_rows[0].as_object().unwrap();
  1448         assert_eq!(
  1449             row.get("event_kind").unwrap().as_str().unwrap(),
  1450             "tool_call"
  1451         );
  1452         assert_eq!(row.get("tool_name").unwrap().as_str().unwrap(), "Read");
  1453     }
  1454  
  1455     #[test]
  1456     fn codex_turn_context_promotes_model_and_turn_id() {
  1457         let record = json!({
  1458             "timestamp": "2026-02-15T03:50:42.191Z",
  1459             "type": "turn_context",
  1460             "payload": {
  1461                 "turn_id": "019c5f6a-49bd-7920-ac67-1dd8e33b0e95",
  1462                 "model": "gpt-5.3-codex"
  1463             }
  1464         });
  1465  
  1466         let out = normalize_record(
  1467             &record,
  1468             "codex",
  1469             "codex",
  1470             "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  1471             1,
  1472             1,
  1473             1,
  1474             1,
  1475             "",
  1476             "",
  1477         );
  1478  
  1479         let row = out.event_rows[0].as_object().unwrap();
  1480         assert_eq!(row.get("payload_type").unwrap().as_str().unwrap(), "turn_context");
  1481         assert_eq!(row.get("model").unwrap().as_str().unwrap(), "gpt-5.3-codex");
  1482         assert_eq!(
  1483             row.get("request_id").unwrap().as_str().unwrap(),
  1484             "019c5f6a-49bd-7920-ac67-1dd8e33b0e95"
  1485         );
  1486         assert_eq!(
  1487             row.get("item_id").unwrap().as_str().unwrap(),
  1488             "019c5f6a-49bd-7920-ac67-1dd8e33b0e95"
  1489         );
  1490     }
  1491  
  1492     #[test]
  1493     fn codex_token_count_promotes_usage_fields() {
  1494         let record = json!({
  1495             "timestamp": "2026-02-15T03:50:50.838Z",
  1496             "type": "event_msg",
  1497             "payload": {
  1498                 "type": "token_count",
  1499                 "info": {
  1500                     "last_token_usage": {
  1501                         "input_tokens": 65323,
  1502                         "output_tokens": 445,
  1503                         "cached_input_tokens": 58624
  1504                     }
  1505                 },
  1506                 "rate_limits": {
  1507                     "limit_name": "GPT-5.3-Codex-Spark",
  1508                     "limit_id": "codex_bengalfox",
  1509                     "plan_type": "pro"
  1510                 }
  1511             }
  1512         });
  1513  
  1514         let out = normalize_record(
  1515             &record,
  1516             "codex",
  1517             "codex",
  1518             "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  1519             1,
  1520             1,
  1521             2,
  1522             2,
  1523             "",
  1524             "",
  1525         );
  1526  
  1527         let row = out.event_rows[0].as_object().unwrap();
  1528         assert_eq!(row.get("payload_type").unwrap().as_str().unwrap(), "token_count");
  1529         assert_eq!(row.get("input_tokens").unwrap().as_u64().unwrap(), 65323);
  1530         assert_eq!(row.get("output_tokens").unwrap().as_u64().unwrap(), 445);
  1531         assert_eq!(row.get("cache_read_tokens").unwrap().as_u64().unwrap(), 58624);
  1532         assert_eq!(
  1533             row.get("model").unwrap().as_str().unwrap(),
  1534             "gpt-5.3-codex-spark"
  1535         );
  1536         assert_eq!(row.get("service_tier").unwrap().as_str().unwrap(), "pro");
  1537         assert!(
  1538             !row.get("token_usage_json")
  1539                 .unwrap()
  1540                 .as_str()
  1541                 .unwrap()
  1542                 .is_empty()
  1543         );
  1544     }
  1545  
  1546     #[test]
  1547     fn codex_token_count_alias_codex_maps_to_xhigh() {
  1548         let record = json!({
  1549             "timestamp": "2026-02-15T04:52:55.538Z",
  1550             "type": "event_msg",
  1551             "payload": {
  1552                 "type": "token_count",
  1553                 "info": {
  1554                     "last_token_usage": {
  1555                         "input_tokens": 72636,
  1556                         "output_tokens": 285,
  1557                         "cached_input_tokens": 70784
  1558                     }
  1559                 },
  1560                 "rate_limits": {
  1561                     "limit_id": "codex",
  1562                     "limit_name": null,
  1563                     "plan_type": "pro"
  1564                 }
  1565             }
  1566         });
  1567  
  1568         let out = normalize_record(
  1569             &record,
  1570             "codex",
  1571             "codex",
  1572             "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  1573             1,
  1574             1,
  1575             4,
  1576             4,
  1577             "",
  1578             "",
  1579         );
  1580  
  1581         let row = out.event_rows[0].as_object().unwrap();
  1582         assert_eq!(
  1583             row.get("model").unwrap().as_str().unwrap(),
  1584             "gpt-5.3-codex-xhigh"
  1585         );
  1586     }
  1587  
  1588     #[test]
  1589     fn codex_custom_tool_call_promotes_tool_fields() {
  1590         let record = json!({
  1591             "timestamp": "2026-02-15T03:50:50.838Z",
  1592             "type": "response_item",
  1593             "payload": {
  1594                 "type": "custom_tool_call",
  1595                 "call_id": "call_abc",
  1596                 "name": "apply_patch",
  1597                 "status": "completed",
  1598                 "input": "*** Begin Patch\n*** End Patch\n"
  1599             }
  1600         });
  1601  
  1602         let out = normalize_record(
  1603             &record,
  1604             "codex",
  1605             "codex",
  1606             "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
  1607             1,
  1608             1,
  1609             3,
  1610             3,
  1611             "",
  1612             "",
  1613         );
  1614  
  1615         assert_eq!(out.event_rows.len(), 1);
  1616         assert_eq!(out.tool_rows.len(), 1);
  1617         let row = out.event_rows[0].as_object().unwrap();
  1618         assert_eq!(
  1619             row.get("event_kind").unwrap().as_str().unwrap(),
  1620             "tool_call"
  1621         );
  1622         assert_eq!(
  1623             row.get("tool_call_id").unwrap().as_str().unwrap(),
  1624             "call_abc"
  1625         );
  1626         assert_eq!(
  1627             row.get("tool_name").unwrap().as_str().unwrap(),
  1628             "apply_patch"
  1629         );
  1630         assert_eq!(
  1631             row.get("op_status").unwrap().as_str().unwrap(),
  1632             "completed"
  1633         );
  1634     }
  1635  
  1636     #[test]
  1637     fn claude_tool_use_and_result_blocks() {
  1638         let record = json!({
  1639             "type": "assistant",
  1640             "sessionId": "7c666c01-d38e-4658-8650-854ffb5b626e",
  1641             "uuid": "assistant-1",
  1642             "parentUuid": "user-1",
  1643             "requestId": "req-1",
  1644             "timestamp": "2026-01-19T15:58:41.421Z",
  1645             "message": {
  1646                 "model": "claude-opus-4-5-20251101",
  1647                 "role": "assistant",
  1648                 "usage": {
  1649                     "input_tokens": 9,
  1650                     "output_tokens": 5,
  1651                     "cache_creation_input_tokens": 19630,
  1652                     "cache_read_input_tokens": 0,
  1653                     "service_tier": "standard"
  1654                 },
  1655                 "content": [
  1656                     {
  1657                         "type": "tool_use",
  1658                         "id": "toolu_1",
  1659                         "name": "WebFetch",
  1660                         "input": {"url": "https://example.com"}
  1661                     },
  1662                     {
  1663                         "type": "text",
  1664                         "text": "done"
  1665                     }
  1666                 ]
  1667             }
  1668         });
  1669  
  1670         let out = normalize_record(
  1671             &record,
  1672             "claude",
  1673             "claude",
  1674             "/Users/eric/.claude/projects/p1/s1.jsonl",
  1675             55,
  1676             2,
  1677             10,
  1678             100,
  1679             "",
  1680             "",
  1681         );
  1682  
  1683         assert_eq!(out.event_rows.len(), 2);
  1684         assert_eq!(out.tool_rows.len(), 1);
  1685  
  1686         let first = out.event_rows[0].as_object().unwrap();
  1687         assert_eq!(
  1688             first.get("event_kind").unwrap().as_str().unwrap(),
  1689             "tool_call"
  1690         );
  1691         assert_eq!(first.get("provider").unwrap().as_str().unwrap(), "claude");
  1692     }
  1693 }