rust/ingestor/src/normalize.rs
1 use crate::model::NormalizedRecord;
2 use chrono::{DateTime, Utc};
3 use regex::Regex;
4 use serde_json::{json, Map, Value};
5 use sha2::{Digest, Sha256};
6 use std::sync::OnceLock;
7 use std::time::{SystemTime, UNIX_EPOCH};
8
9 const TEXT_LIMIT: usize = 200_000;
10 const PREVIEW_LIMIT: usize = 320;
11
12 fn session_id_re() -> &'static Regex {
13 static SESSION_ID_RE: OnceLock<Regex> = OnceLock::new();
14 SESSION_ID_RE.get_or_init(|| {
15 Regex::new(
16 r"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})$",
17 )
18 .expect("valid session id regex")
19 })
20 }
21
22 fn session_date_re() -> &'static Regex {
23 static SESSION_DATE_RE: OnceLock<Regex> = OnceLock::new();
24 SESSION_DATE_RE.get_or_init(|| {
25 Regex::new(r"/(?:sessions|projects)/(\d{4})/(\d{2})/(\d{2})/")
26 .expect("valid session date regex")
27 })
28 }
29
30 fn to_str(value: Option<&Value>) -> String {
31 match value {
32 None | Some(Value::Null) => String::new(),
33 Some(Value::String(s)) => s.clone(),
34 Some(other) => other.to_string(),
35 }
36 }
37
38 fn to_u32(value: Option<&Value>) -> u32 {
39 match value {
40 Some(Value::Number(n)) => n.as_u64().unwrap_or(0).min(u32::MAX as u64) as u32,
41 Some(Value::String(s)) => s.parse::<u64>().unwrap_or(0).min(u32::MAX as u64) as u32,
42 _ => 0,
43 }
44 }
45
46 fn to_u16(value: Option<&Value>) -> u16 {
47 to_u32(value).min(u16::MAX as u32) as u16
48 }
49
50 fn to_u8_bool(value: Option<&Value>) -> u8 {
51 match value {
52 Some(Value::Bool(v)) => {
53 if *v {
54 1
55 } else {
56 0
57 }
58 }
59 Some(Value::Number(v)) => {
60 if v.as_i64().unwrap_or(0) != 0 {
61 1
62 } else {
63 0
64 }
65 }
66 Some(Value::String(s)) => {
67 let lower = s.to_ascii_lowercase();
68 if lower == "true" || lower == "1" {
69 1
70 } else {
71 0
72 }
73 }
74 _ => 0,
75 }
76 }
77
78 fn canonicalize_model(provider: &str, raw_model: &str) -> String {
79 let mut model = raw_model.trim().to_ascii_lowercase();
80 if model.is_empty() {
81 return String::new();
82 }
83
84 model = model.replace(' ', "-");
85
86 if provider == "codex" && model == "codex" {
87 return "gpt-5.3-codex-xhigh".to_string();
88 }
89
90 model
91 }
92
93 fn resolve_model_hint(event_rows: &[Value], provider: &str, fallback: &str) -> String {
94 for row in event_rows.iter().rev() {
95 if let Some(model) = row.get("model").and_then(Value::as_str) {
96 let normalized = canonicalize_model(provider, model);
97 if !normalized.is_empty() {
98 return normalized;
99 }
100 }
101 }
102
103 canonicalize_model(provider, fallback)
104 }
105
106 fn compact_json(value: &Value) -> String {
107 serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string())
108 }
109
110 fn truncate_chars(input: &str, max_chars: usize) -> String {
111 if input.chars().count() <= max_chars {
112 input.to_string()
113 } else {
114 input.chars().take(max_chars).collect()
115 }
116 }
117
118 fn extract_message_text(content: &Value) -> String {
119 fn walk(node: &Value, out: &mut Vec<String>) {
120 match node {
121 Value::String(s) => {
122 if !s.trim().is_empty() {
123 out.push(s.clone());
124 }
125 }
126 Value::Array(items) => {
127 for item in items {
128 walk(item, out);
129 }
130 }
131 Value::Object(map) => {
132 for key in ["text", "message", "output", "thinking", "summary"] {
133 if let Some(Value::String(s)) = map.get(key) {
134 if !s.trim().is_empty() {
135 out.push(s.clone());
136 }
137 }
138 }
139
140 for key in ["content", "text_elements", "input"] {
141 if let Some(value) = map.get(key) {
142 walk(value, out);
143 }
144 }
145 }
146 _ => {}
147 }
148 }
149
150 let mut chunks = Vec::<String>::new();
151 walk(content, &mut chunks);
152 truncate_chars(&chunks.join("\n"), TEXT_LIMIT)
153 }
154
155 fn extract_content_types(content: &Value) -> Vec<String> {
156 if let Value::Array(items) = content {
157 let mut out = Vec::<String>::new();
158 for item in items {
159 if let Some(t) = item.get("type").and_then(|v| v.as_str()) {
160 if !t.is_empty() {
161 out.push(t.to_string());
162 }
163 }
164 }
165 out.sort();
166 out.dedup();
167 return out;
168 }
169 Vec::new()
170 }
171
172 pub fn infer_session_id_from_file(source_file: &str) -> String {
173 let stem = std::path::Path::new(source_file)
174 .file_stem()
175 .and_then(|s| s.to_str())
176 .unwrap_or_default();
177
178 session_id_re()
179 .captures(stem)
180 .and_then(|cap| cap.get(1).map(|m| m.as_str().to_string()))
181 .unwrap_or_default()
182 }
183
184 pub fn infer_session_date_from_file(source_file: &str, record_ts: &str) -> String {
185 if let Some(cap) = session_date_re().captures(source_file) {
186 return format!("{}-{}-{}", &cap[1], &cap[2], &cap[3]);
187 }
188
189 parse_event_ts(record_ts)
190 .split(' ')
191 .next()
192 .unwrap_or("1970-01-01")
193 .to_string()
194 }
195
196 fn parse_event_ts(record_ts: &str) -> String {
197 if let Ok(dt) = DateTime::parse_from_rfc3339(record_ts) {
198 return dt
199 .with_timezone(&Utc)
200 .format("%Y-%m-%d %H:%M:%S%.3f")
201 .to_string();
202 }
203
204 Utc::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string()
205 }
206
207 fn event_uid(
208 source_file: &str,
209 source_generation: u32,
210 source_line_no: u64,
211 source_offset: u64,
212 record_fingerprint: &str,
213 suffix: &str,
214 ) -> String {
215 let material = format!(
216 "{}|{}|{}|{}|{}|{}",
217 source_file, source_generation, source_line_no, source_offset, record_fingerprint, suffix
218 );
219
220 let mut hasher = Sha256::new();
221 hasher.update(material.as_bytes());
222 format!("{:x}", hasher.finalize())
223 }
224
225 fn event_version() -> u64 {
226 let now = SystemTime::now()
227 .duration_since(UNIX_EPOCH)
228 .unwrap_or_default()
229 .as_millis();
230 now as u64
231 }
232
233 fn raw_hash(raw_json: &str) -> u64 {
234 let mut hasher = Sha256::new();
235 hasher.update(raw_json.as_bytes());
236 let digest = hasher.finalize();
237 let hex = format!("{:x}", digest);
238 u64::from_str_radix(&hex[..16], 16).unwrap_or(0)
239 }
240
241 fn io_hash(input_json: &str, output_json: &str) -> u64 {
242 raw_hash(&format!("{}\n{}", input_json, output_json))
243 }
244
245 struct RecordContext<'a> {
246 source_name: &'a str,
247 provider: &'a str,
248 session_id: &'a str,
249 session_date: &'a str,
250 source_file: &'a str,
251 source_inode: u64,
252 source_generation: u32,
253 source_line_no: u64,
254 source_offset: u64,
255 record_ts: &'a str,
256 event_ts: &'a str,
257 }
258
259 fn base_event_obj(
260 ctx: &RecordContext<'_>,
261 event_uid: &str,
262 event_kind: &str,
263 payload_type: &str,
264 actor_kind: &str,
265 text_content: &str,
266 payload_json: &str,
267 ) -> Map<String, Value> {
268 let text_content = truncate_chars(text_content, TEXT_LIMIT);
269 let mut obj = Map::<String, Value>::new();
270 obj.insert(
271 "event_uid".to_string(),
272 Value::String(event_uid.to_string()),
273 );
274 obj.insert(
275 "session_id".to_string(),
276 Value::String(ctx.session_id.to_string()),
277 );
278 obj.insert(
279 "session_date".to_string(),
280 Value::String(ctx.session_date.to_string()),
281 );
282 obj.insert(
283 "source_name".to_string(),
284 Value::String(ctx.source_name.to_string()),
285 );
286 obj.insert(
287 "provider".to_string(),
288 Value::String(ctx.provider.to_string()),
289 );
290 obj.insert(
291 "source_file".to_string(),
292 Value::String(ctx.source_file.to_string()),
293 );
294 obj.insert("source_inode".to_string(), json!(ctx.source_inode));
295 obj.insert(
296 "source_generation".to_string(),
297 json!(ctx.source_generation),
298 );
299 obj.insert("source_line_no".to_string(), json!(ctx.source_line_no));
300 obj.insert("source_offset".to_string(), json!(ctx.source_offset));
301 obj.insert(
302 "source_ref".to_string(),
303 Value::String(format!(
304 "{}:{}:{}",
305 ctx.source_file, ctx.source_generation, ctx.source_line_no
306 )),
307 );
308 obj.insert(
309 "record_ts".to_string(),
310 Value::String(ctx.record_ts.to_string()),
311 );
312 obj.insert(
313 "event_ts".to_string(),
314 Value::String(ctx.event_ts.to_string()),
315 );
316 obj.insert(
317 "event_kind".to_string(),
318 Value::String(event_kind.to_string()),
319 );
320 obj.insert(
321 "actor_kind".to_string(),
322 Value::String(actor_kind.to_string()),
323 );
324 obj.insert(
325 "payload_type".to_string(),
326 Value::String(payload_type.to_string()),
327 );
328 obj.insert("op_kind".to_string(), Value::String(String::new()));
329 obj.insert("op_status".to_string(), Value::String(String::new()));
330 obj.insert("request_id".to_string(), Value::String(String::new()));
331 obj.insert("trace_id".to_string(), Value::String(String::new()));
332 obj.insert("turn_index".to_string(), json!(0u32));
333 obj.insert("item_id".to_string(), Value::String(String::new()));
334 obj.insert("tool_call_id".to_string(), Value::String(String::new()));
335 obj.insert(
336 "parent_tool_call_id".to_string(),
337 Value::String(String::new()),
338 );
339 obj.insert("origin_event_id".to_string(), Value::String(String::new()));
340 obj.insert(
341 "origin_tool_call_id".to_string(),
342 Value::String(String::new()),
343 );
344 obj.insert("tool_name".to_string(), Value::String(String::new()));
345 obj.insert("tool_phase".to_string(), Value::String(String::new()));
346 obj.insert("tool_error".to_string(), json!(0u8));
347 obj.insert("agent_run_id".to_string(), Value::String(String::new()));
348 obj.insert("agent_label".to_string(), Value::String(String::new()));
349 obj.insert("coord_group_id".to_string(), Value::String(String::new()));
350 obj.insert(
351 "coord_group_label".to_string(),
352 Value::String(String::new()),
353 );
354 obj.insert("is_substream".to_string(), json!(0u8));
355 obj.insert("model".to_string(), Value::String(String::new()));
356 obj.insert("input_tokens".to_string(), json!(0u32));
357 obj.insert("output_tokens".to_string(), json!(0u32));
358 obj.insert("cache_read_tokens".to_string(), json!(0u32));
359 obj.insert("cache_write_tokens".to_string(), json!(0u32));
360 obj.insert("latency_ms".to_string(), json!(0u32));
361 obj.insert("retry_count".to_string(), json!(0u16));
362 obj.insert("service_tier".to_string(), Value::String(String::new()));
363 obj.insert("content_types".to_string(), json!([]));
364 obj.insert("has_reasoning".to_string(), json!(0u8));
365 obj.insert(
366 "text_content".to_string(),
367 Value::String(text_content.clone()),
368 );
369 obj.insert(
370 "text_preview".to_string(),
371 Value::String(truncate_chars(&text_content, PREVIEW_LIMIT)),
372 );
373 obj.insert(
374 "payload_json".to_string(),
375 Value::String(payload_json.to_string()),
376 );
377 obj.insert("token_usage_json".to_string(), Value::String(String::new()));
378 obj.insert("event_version".to_string(), json!(event_version()));
379 obj
380 }
381
382 fn build_link_row(
383 ctx: &RecordContext<'_>,
384 event_uid: &str,
385 linked_event_uid: &str,
386 link_type: &str,
387 metadata_json: &str,
388 ) -> Value {
389 json!({
390 "event_uid": event_uid,
391 "linked_event_uid": linked_event_uid,
392 "link_type": link_type,
393 "session_id": ctx.session_id,
394 "provider": ctx.provider,
395 "source_name": ctx.source_name,
396 "metadata_json": metadata_json,
397 "event_version": event_version(),
398 })
399 }
400
401 fn build_tool_row(
402 ctx: &RecordContext<'_>,
403 event_uid: &str,
404 tool_call_id: &str,
405 parent_tool_call_id: &str,
406 tool_name: &str,
407 tool_phase: &str,
408 tool_error: u8,
409 input_json: &str,
410 output_json: &str,
411 output_text: &str,
412 ) -> Value {
413 let input_json = truncate_chars(input_json, TEXT_LIMIT);
414 let output_json = truncate_chars(output_json, TEXT_LIMIT);
415 let output_text = truncate_chars(output_text, TEXT_LIMIT);
416
417 json!({
418 "event_uid": event_uid,
419 "session_id": ctx.session_id,
420 "provider": ctx.provider,
421 "source_name": ctx.source_name,
422 "tool_call_id": tool_call_id,
423 "parent_tool_call_id": parent_tool_call_id,
424 "tool_name": tool_name,
425 "tool_phase": tool_phase,
426 "tool_error": tool_error,
427 "input_json": input_json,
428 "output_json": output_json,
429 "output_text": output_text,
430 "input_bytes": input_json.len() as u32,
431 "output_bytes": output_json.len() as u32,
432 "input_preview": truncate_chars(&input_json, PREVIEW_LIMIT),
433 "output_preview": truncate_chars(&output_text, PREVIEW_LIMIT),
434 "io_hash": io_hash(&input_json, &output_json),
435 "source_ref": format!("{}:{}:{}", ctx.source_file, ctx.source_generation, ctx.source_line_no),
436 "event_version": event_version(),
437 })
438 }
439
440 fn normalize_codex_event(
441 record: &Value,
442 ctx: &RecordContext<'_>,
443 top_type: &str,
444 base_uid: &str,
445 model_hint: &str,
446 ) -> (Vec<Value>, Vec<Value>, Vec<Value>) {
447 let mut events = Vec::<Value>::new();
448 let mut links = Vec::<Value>::new();
449 let mut tools = Vec::<Value>::new();
450
451 let payload = record.get("payload").cloned().unwrap_or(Value::Null);
452 let payload_obj = payload.as_object().cloned().unwrap_or_else(Map::new);
453 let payload_json = compact_json(&Value::Object(payload_obj.clone()));
454
455 let push_parent_link = |links: &mut Vec<Value>, uid: &str, parent: &str| {
456 if !parent.is_empty() {
457 links.push(build_link_row(ctx, uid, parent, "parent_event", "{}"));
458 }
459 };
460
461 match top_type {
462 "session_meta" => {
463 let mut row = base_event_obj(
464 ctx,
465 base_uid,
466 "session_meta",
467 "session_meta",
468 "system",
469 "",
470 &payload_json,
471 );
472 row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
473 events.push(Value::Object(row));
474 }
475 "turn_context" => {
476 let mut row = base_event_obj(
477 ctx,
478 base_uid,
479 "turn_context",
480 "turn_context",
481 "system",
482 "",
483 &payload_json,
484 );
485 row.insert(
486 "turn_index".to_string(),
487 json!(to_u32(payload_obj.get("turn_id"))),
488 );
489 let turn_id = to_str(payload_obj.get("turn_id"));
490 if !turn_id.is_empty() {
491 row.insert("request_id".to_string(), json!(turn_id.clone()));
492 row.insert("item_id".to_string(), json!(turn_id));
493 }
494 let model = canonicalize_model("codex", &to_str(payload_obj.get("model")));
495 if !model.is_empty() {
496 row.insert("model".to_string(), json!(model));
497 }
498 events.push(Value::Object(row));
499 }
500 "response_item" => {
501 let payload_type = to_str(payload_obj.get("type"));
502 match payload_type.as_str() {
503 "message" => {
504 let role = to_str(payload_obj.get("role"));
505 let content = payload_obj.get("content").cloned().unwrap_or(Value::Null);
506 let text = extract_message_text(&content);
507 let mut row = base_event_obj(
508 ctx,
509 base_uid,
510 "message",
511 "message",
512 if role.is_empty() {
513 "assistant"
514 } else {
515 role.as_str()
516 },
517 &text,
518 &payload_json,
519 );
520 row.insert(
521 "content_types".to_string(),
522 json!(extract_content_types(&content)),
523 );
524 row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
525 row.insert(
526 "op_status".to_string(),
527 json!(to_str(payload_obj.get("phase"))),
528 );
529 events.push(Value::Object(row));
530 }
531 "function_call" => {
532 let args = to_str(payload_obj.get("arguments"));
533 let call_id = to_str(payload_obj.get("call_id"));
534 let name = to_str(payload_obj.get("name"));
535 let mut row = base_event_obj(
536 ctx,
537 base_uid,
538 "tool_call",
539 "function_call",
540 "assistant",
541 &args,
542 &payload_json,
543 );
544 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
545 row.insert("tool_name".to_string(), json!(name.clone()));
546 events.push(Value::Object(row));
547
548 tools.push(build_tool_row(
549 ctx, base_uid, &call_id, "", &name, "request", 0, &args, "", "",
550 ));
551 }
552 "function_call_output" => {
553 let output = to_str(payload_obj.get("output"));
554 let call_id = to_str(payload_obj.get("call_id"));
555 let mut row = base_event_obj(
556 ctx,
557 base_uid,
558 "tool_result",
559 "function_call_output",
560 "tool",
561 &output,
562 &payload_json,
563 );
564 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
565 events.push(Value::Object(row));
566
567 tools.push(build_tool_row(
568 ctx,
569 base_uid,
570 &call_id,
571 "",
572 "",
573 "response",
574 0,
575 "",
576 &compact_json(payload_obj.get("output").unwrap_or(&Value::Null)),
577 &output,
578 ));
579 }
580 "custom_tool_call" => {
581 let input = to_str(payload_obj.get("input"));
582 let call_id = to_str(payload_obj.get("call_id"));
583 let name = to_str(payload_obj.get("name"));
584 let status = to_str(payload_obj.get("status"));
585 let mut row = base_event_obj(
586 ctx,
587 base_uid,
588 "tool_call",
589 "custom_tool_call",
590 "assistant",
591 &input,
592 &payload_json,
593 );
594 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
595 row.insert("tool_name".to_string(), json!(name.clone()));
596 row.insert("op_status".to_string(), json!(status));
597 events.push(Value::Object(row));
598
599 tools.push(build_tool_row(
600 ctx, base_uid, &call_id, "", &name, "request", 0, &input, "", "",
601 ));
602 }
603 "custom_tool_call_output" => {
604 let output = to_str(payload_obj.get("output"));
605 let call_id = to_str(payload_obj.get("call_id"));
606 let status = to_str(payload_obj.get("status"));
607 let output_json = serde_json::from_str::<Value>(&output)
608 .map(|parsed| compact_json(&parsed))
609 .unwrap_or_else(|_| {
610 compact_json(payload_obj.get("output").unwrap_or(&Value::Null))
611 });
612
613 let mut row = base_event_obj(
614 ctx,
615 base_uid,
616 "tool_result",
617 "custom_tool_call_output",
618 "tool",
619 &output,
620 &payload_json,
621 );
622 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
623 row.insert("op_status".to_string(), json!(status));
624 events.push(Value::Object(row));
625
626 tools.push(build_tool_row(
627 ctx,
628 base_uid,
629 &call_id,
630 "",
631 "",
632 "response",
633 0,
634 "",
635 &output_json,
636 &output,
637 ));
638 }
639 "web_search_call" => {
640 let action = payload_obj.get("action").cloned().unwrap_or(Value::Null);
641 let action_type = to_str(action.get("type"));
642 let status = to_str(payload_obj.get("status"));
643 let mut row = base_event_obj(
644 ctx,
645 base_uid,
646 "tool_call",
647 "web_search_call",
648 "assistant",
649 &extract_message_text(&action),
650 &payload_json,
651 );
652 row.insert("tool_name".to_string(), json!("web_search"));
653 row.insert("op_kind".to_string(), json!(action_type));
654 row.insert("op_status".to_string(), json!(status.clone()));
655 row.insert("tool_phase".to_string(), json!(status));
656 events.push(Value::Object(row));
657 }
658 "reasoning" => {
659 let summary = payload_obj.get("summary").cloned().unwrap_or(Value::Null);
660 let mut row = base_event_obj(
661 ctx,
662 base_uid,
663 "reasoning",
664 "reasoning",
665 "assistant",
666 &extract_message_text(&summary),
667 &payload_json,
668 );
669 row.insert("has_reasoning".to_string(), json!(1u8));
670 row.insert("item_id".to_string(), json!(to_str(payload_obj.get("id"))));
671 events.push(Value::Object(row));
672 }
673 _ => {
674 events.push(Value::Object(base_event_obj(
675 ctx,
676 base_uid,
677 "unknown",
678 if payload_type.is_empty() {
679 "response_item"
680 } else {
681 payload_type.as_str()
682 },
683 "system",
684 &extract_message_text(&payload),
685 &payload_json,
686 )));
687 }
688 }
689 }
690 "event_msg" => {
691 let payload_type = to_str(payload_obj.get("type"));
692 let actor = match payload_type.as_str() {
693 "user_message" => "user",
694 "agent_message" | "agent_reasoning" => "assistant",
695 _ => "system",
696 };
697 let mut row = base_event_obj(
698 ctx,
699 base_uid,
700 "event_msg",
701 if payload_type.is_empty() {
702 "event_msg"
703 } else {
704 payload_type.as_str()
705 },
706 actor,
707 &extract_message_text(&payload),
708 &payload_json,
709 );
710 let turn_id = to_str(payload_obj.get("turn_id"));
711 if !turn_id.is_empty() {
712 row.insert("request_id".to_string(), json!(turn_id.clone()));
713 row.insert("item_id".to_string(), json!(turn_id));
714 }
715 let status = to_str(payload_obj.get("status"));
716 if !status.is_empty() {
717 row.insert("op_status".to_string(), json!(status));
718 }
719 if payload_type == "token_count" {
720 let usage = payload_obj
721 .get("info")
722 .and_then(|v| v.get("last_token_usage"));
723 let input_tokens = to_u32(usage.and_then(|v| v.get("input_tokens")));
724 let output_tokens = to_u32(usage.and_then(|v| v.get("output_tokens")));
725 let cache_read_tokens = to_u32(
726 usage
727 .and_then(|v| v.get("cached_input_tokens"))
728 .or_else(|| usage.and_then(|v| v.get("cache_read_input_tokens"))),
729 );
730 let cache_write_tokens = to_u32(
731 usage
732 .and_then(|v| v.get("cache_creation_input_tokens"))
733 .or_else(|| usage.and_then(|v| v.get("cache_write_input_tokens"))),
734 );
735
736 let model = to_str(
737 payload_obj
738 .get("rate_limits")
739 .and_then(|v| v.get("limit_name")),
740 );
741 let fallback_model = to_str(payload_obj.get("model"));
742 let fallback_limit_id = to_str(
743 payload_obj
744 .get("rate_limits")
745 .and_then(|v| v.get("limit_id")),
746 );
747 let resolved_model = if !model.is_empty() {
748 canonicalize_model("codex", &model)
749 } else if !fallback_model.is_empty() {
750 canonicalize_model("codex", &fallback_model)
751 } else if !fallback_limit_id.is_empty() {
752 canonicalize_model("codex", &fallback_limit_id)
753 } else {
754 canonicalize_model("codex", model_hint)
755 };
756
757 row.insert("input_tokens".to_string(), json!(input_tokens));
758 row.insert("output_tokens".to_string(), json!(output_tokens));
759 row.insert("cache_read_tokens".to_string(), json!(cache_read_tokens));
760 row.insert("cache_write_tokens".to_string(), json!(cache_write_tokens));
761 if !resolved_model.is_empty() {
762 row.insert("model".to_string(), json!(resolved_model));
763 }
764 row.insert(
765 "service_tier".to_string(),
766 json!(to_str(payload_obj.get("rate_limits").and_then(|v| v.get("plan_type")))),
767 );
768 row.insert(
769 "token_usage_json".to_string(),
770 json!(compact_json(&payload)),
771 );
772 } else if payload_type == "agent_reasoning" {
773 row.insert("has_reasoning".to_string(), json!(1u8));
774 }
775 events.push(Value::Object(row));
776 }
777 "compacted" => {
778 events.push(Value::Object(base_event_obj(
779 ctx,
780 base_uid,
781 "compacted_raw",
782 "compacted",
783 "system",
784 "",
785 &payload_json,
786 )));
787
788 if let Some(Value::Array(items)) = payload_obj.get("replacement_history") {
789 for (idx, item) in items.iter().enumerate() {
790 let item_uid = event_uid(
791 ctx.source_file,
792 ctx.source_generation,
793 ctx.source_line_no,
794 ctx.source_offset,
795 &compact_json(item),
796 &format!("compacted:{}", idx),
797 );
798 let item_type = to_str(item.get("type"));
799
800 let (kind, payload_type, actor, text) = match item_type.as_str() {
801 "message" => (
802 "message",
803 "message",
804 to_str(item.get("role")),
805 extract_message_text(item.get("content").unwrap_or(&Value::Null)),
806 ),
807 "function_call" => (
808 "tool_call",
809 "function_call",
810 "assistant".to_string(),
811 to_str(item.get("arguments")),
812 ),
813 "function_call_output" => (
814 "tool_result",
815 "function_call_output",
816 "tool".to_string(),
817 to_str(item.get("output")),
818 ),
819 "reasoning" => (
820 "reasoning",
821 "reasoning",
822 "assistant".to_string(),
823 extract_message_text(item.get("summary").unwrap_or(&Value::Null)),
824 ),
825 _ => (
826 "unknown",
827 if item_type.is_empty() {
828 "unknown"
829 } else {
830 item_type.as_str()
831 },
832 "system".to_string(),
833 extract_message_text(item),
834 ),
835 };
836
837 let mut row = base_event_obj(
838 ctx,
839 &item_uid,
840 kind,
841 payload_type,
842 if actor.is_empty() {
843 "assistant"
844 } else {
845 actor.as_str()
846 },
847 &text,
848 &compact_json(item),
849 );
850 row.insert("origin_event_id".to_string(), json!(base_uid));
851 events.push(Value::Object(row));
852
853 links.push(build_link_row(
854 ctx,
855 &item_uid,
856 base_uid,
857 "compacted_parent",
858 "{}",
859 ));
860 }
861 }
862 }
863 "message" | "function_call" | "function_call_output" | "reasoning" => {
864 let event = if top_type == "message" {
865 let role = to_str(record.get("role"));
866 let text = extract_message_text(record.get("content").unwrap_or(&Value::Null));
867 let mut row = base_event_obj(
868 ctx,
869 base_uid,
870 "message",
871 "message",
872 if role.is_empty() {
873 "assistant"
874 } else {
875 role.as_str()
876 },
877 &text,
878 &compact_json(record),
879 );
880 row.insert(
881 "content_types".to_string(),
882 json!(extract_content_types(
883 record.get("content").unwrap_or(&Value::Null)
884 )),
885 );
886 Value::Object(row)
887 } else if top_type == "function_call" {
888 let args = to_str(record.get("arguments"));
889 let call_id = to_str(record.get("call_id"));
890 let name = to_str(record.get("name"));
891 let mut row = base_event_obj(
892 ctx,
893 base_uid,
894 "tool_call",
895 "function_call",
896 "assistant",
897 &args,
898 &compact_json(record),
899 );
900 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
901 row.insert("tool_name".to_string(), json!(name.clone()));
902 tools.push(build_tool_row(
903 ctx, base_uid, &call_id, "", &name, "request", 0, &args, "", "",
904 ));
905 Value::Object(row)
906 } else if top_type == "function_call_output" {
907 let output = to_str(record.get("output"));
908 let call_id = to_str(record.get("call_id"));
909 let mut row = base_event_obj(
910 ctx,
911 base_uid,
912 "tool_result",
913 "function_call_output",
914 "tool",
915 &output,
916 &compact_json(record),
917 );
918 row.insert("tool_call_id".to_string(), json!(call_id.clone()));
919 tools.push(build_tool_row(
920 ctx,
921 base_uid,
922 &call_id,
923 "",
924 "",
925 "response",
926 0,
927 "",
928 &compact_json(record.get("output").unwrap_or(&Value::Null)),
929 &output,
930 ));
931 Value::Object(row)
932 } else {
933 let summary = record.get("summary").cloned().unwrap_or(Value::Null);
934 let mut row = base_event_obj(
935 ctx,
936 base_uid,
937 "reasoning",
938 "reasoning",
939 "assistant",
940 &extract_message_text(&summary),
941 &compact_json(record),
942 );
943 row.insert("has_reasoning".to_string(), json!(1u8));
944 Value::Object(row)
945 };
946
947 events.push(event);
948 }
949 _ => {
950 events.push(Value::Object(base_event_obj(
951 ctx,
952 base_uid,
953 "unknown",
954 if top_type.is_empty() {
955 "unknown"
956 } else {
957 top_type
958 },
959 "system",
960 &extract_message_text(record),
961 &compact_json(record),
962 )));
963 }
964 }
965
966 let payload_model = canonicalize_model("codex", &to_str(payload_obj.get("model")));
967 let inherited_model = canonicalize_model("codex", model_hint);
968 for event in &mut events {
969 if let Some(row) = event.as_object_mut() {
970 let row_model = canonicalize_model("codex", &to_str(row.get("model")));
971 let resolved_model = if !row_model.is_empty() {
972 row_model
973 } else if !payload_model.is_empty() {
974 payload_model.clone()
975 } else {
976 inherited_model.clone()
977 };
978
979 if !resolved_model.is_empty() {
980 row.insert("model".to_string(), json!(resolved_model));
981 }
982 }
983 }
984
985 let parent = to_str(record.get("parent_id"));
986 if !events.is_empty() && !parent.is_empty() {
987 if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
988 push_parent_link(&mut links, uid, &parent);
989 }
990 }
991
992 (events, links, tools)
993 }
994
995 fn normalize_claude_event(
996 record: &Value,
997 ctx: &RecordContext<'_>,
998 top_type: &str,
999 base_uid: &str,
1000 ) -> (Vec<Value>, Vec<Value>, Vec<Value>) {
1001 let mut events = Vec::<Value>::new();
1002 let mut links = Vec::<Value>::new();
1003 let mut tools = Vec::<Value>::new();
1004
1005 let parent_uuid = to_str(record.get("parentUuid"));
1006 let request_id = to_str(record.get("requestId"));
1007 let trace_id = to_str(record.get("requestId"));
1008 let agent_run_id = to_str(record.get("agentId"));
1009 let agent_label = to_str(record.get("agentName"));
1010 let coord_group_label = to_str(record.get("teamName"));
1011 let is_substream = to_u8_bool(record.get("isSidechain"));
1012
1013 let message = record.get("message").cloned().unwrap_or(Value::Null);
1014 let msg_role = to_str(message.get("role"));
1015 let model = canonicalize_model("claude", &to_str(message.get("model")));
1016
1017 let usage = message.get("usage").cloned().unwrap_or(Value::Null);
1018 let input_tokens = to_u32(usage.get("input_tokens"));
1019 let output_tokens = to_u32(usage.get("output_tokens"));
1020 let cache_read_tokens = to_u32(usage.get("cache_read_input_tokens"));
1021 let cache_write_tokens = to_u32(usage.get("cache_creation_input_tokens"));
1022 let service_tier = to_str(usage.get("service_tier"));
1023
1024 let stamp_common = |obj: &mut Map<String, Value>| {
1025 obj.insert("request_id".to_string(), json!(request_id.clone()));
1026 obj.insert("trace_id".to_string(), json!(trace_id.clone()));
1027 obj.insert("agent_run_id".to_string(), json!(agent_run_id.clone()));
1028 obj.insert("agent_label".to_string(), json!(agent_label.clone()));
1029 obj.insert(
1030 "coord_group_label".to_string(),
1031 json!(coord_group_label.clone()),
1032 );
1033 obj.insert("is_substream".to_string(), json!(is_substream));
1034 obj.insert("model".to_string(), json!(model.clone()));
1035 obj.insert("input_tokens".to_string(), json!(input_tokens));
1036 obj.insert("output_tokens".to_string(), json!(output_tokens));
1037 obj.insert("cache_read_tokens".to_string(), json!(cache_read_tokens));
1038 obj.insert("cache_write_tokens".to_string(), json!(cache_write_tokens));
1039 obj.insert("service_tier".to_string(), json!(service_tier.clone()));
1040 obj.insert("item_id".to_string(), json!(to_str(record.get("uuid"))));
1041 obj.insert(
1042 "origin_event_id".to_string(),
1043 json!(to_str(record.get("sourceToolAssistantUUID"))),
1044 );
1045 obj.insert(
1046 "origin_tool_call_id".to_string(),
1047 json!(to_str(record.get("sourceToolUseID"))),
1048 );
1049 };
1050
1051 if top_type == "assistant" || top_type == "user" {
1052 let actor = if top_type == "assistant" {
1053 "assistant"
1054 } else if msg_role == "assistant" {
1055 "assistant"
1056 } else {
1057 "user"
1058 };
1059
1060 let content = message.get("content").cloned().unwrap_or_else(|| {
1061 record
1062 .get("message")
1063 .and_then(|m| m.get("content"))
1064 .cloned()
1065 .unwrap_or(Value::Null)
1066 });
1067
1068 match content {
1069 Value::Array(items) if !items.is_empty() => {
1070 for (idx, item) in items.iter().enumerate() {
1071 let block_type = to_str(item.get("type"));
1072 let suffix = format!("claude:block:{}", idx);
1073 let block_uid = event_uid(
1074 ctx.source_file,
1075 ctx.source_generation,
1076 ctx.source_line_no,
1077 ctx.source_offset,
1078 &compact_json(item),
1079 &suffix,
1080 );
1081
1082 let mut row = match block_type.as_str() {
1083 "thinking" => {
1084 let mut r = base_event_obj(
1085 ctx,
1086 &block_uid,
1087 "reasoning",
1088 "thinking",
1089 "assistant",
1090 &extract_message_text(item),
1091 &compact_json(item),
1092 );
1093 r.insert("has_reasoning".to_string(), json!(1u8));
1094 r.insert("content_types".to_string(), json!(["thinking"]));
1095 r
1096 }
1097 "tool_use" => {
1098 let tool_call_id = to_str(item.get("id"));
1099 let tool_name = to_str(item.get("name"));
1100 let input_json =
1101 compact_json(item.get("input").unwrap_or(&Value::Null));
1102 let mut r = base_event_obj(
1103 ctx,
1104 &block_uid,
1105 "tool_call",
1106 "tool_use",
1107 "assistant",
1108 &extract_message_text(item.get("input").unwrap_or(&Value::Null)),
1109 &compact_json(item),
1110 );
1111 r.insert("content_types".to_string(), json!(["tool_use"]));
1112 r.insert("tool_call_id".to_string(), json!(tool_call_id.clone()));
1113 r.insert("tool_name".to_string(), json!(tool_name.clone()));
1114 tools.push(build_tool_row(
1115 ctx,
1116 &block_uid,
1117 &tool_call_id,
1118 &to_str(record.get("parentToolUseID")),
1119 &tool_name,
1120 "request",
1121 0,
1122 &input_json,
1123 "",
1124 "",
1125 ));
1126 r
1127 }
1128 "tool_result" => {
1129 let tool_call_id = to_str(item.get("tool_use_id"));
1130 let output_json =
1131 compact_json(item.get("content").unwrap_or(&Value::Null));
1132 let output_text =
1133 extract_message_text(item.get("content").unwrap_or(&Value::Null));
1134 let tool_error = to_u8_bool(item.get("is_error"));
1135 let mut r = base_event_obj(
1136 ctx,
1137 &block_uid,
1138 "tool_result",
1139 "tool_result",
1140 "tool",
1141 &output_text,
1142 &compact_json(item),
1143 );
1144 r.insert("content_types".to_string(), json!(["tool_result"]));
1145 r.insert("tool_call_id".to_string(), json!(tool_call_id.clone()));
1146 r.insert("tool_error".to_string(), json!(tool_error));
1147 tools.push(build_tool_row(
1148 ctx,
1149 &block_uid,
1150 &tool_call_id,
1151 &to_str(record.get("parentToolUseID")),
1152 "",
1153 "response",
1154 tool_error,
1155 "",
1156 &output_json,
1157 &output_text,
1158 ));
1159 r
1160 }
1161 _ => {
1162 let mut r = base_event_obj(
1163 ctx,
1164 &block_uid,
1165 "message",
1166 if block_type.is_empty() {
1167 "text"
1168 } else {
1169 block_type.as_str()
1170 },
1171 actor,
1172 &extract_message_text(item),
1173 &compact_json(item),
1174 );
1175 if !block_type.is_empty() {
1176 r.insert("content_types".to_string(), json!([block_type]));
1177 }
1178 r
1179 }
1180 };
1181
1182 stamp_common(&mut row);
1183 row.insert(
1184 "parent_tool_call_id".to_string(),
1185 json!(to_str(record.get("parentToolUseID"))),
1186 );
1187 row.insert(
1188 "origin_tool_call_id".to_string(),
1189 json!(to_str(record.get("sourceToolUseID"))),
1190 );
1191 row.insert(
1192 "tool_phase".to_string(),
1193 json!(to_str(record.get("stop_reason"))),
1194 );
1195 events.push(Value::Object(row));
1196
1197 if !parent_uuid.is_empty() {
1198 links.push(build_link_row(
1199 ctx,
1200 &block_uid,
1201 &parent_uuid,
1202 "parent_uuid",
1203 "{}",
1204 ));
1205 }
1206 }
1207 }
1208 _ => {
1209 let text = extract_message_text(&message);
1210 let mut row = base_event_obj(
1211 ctx,
1212 base_uid,
1213 "message",
1214 "message",
1215 actor,
1216 &text,
1217 &compact_json(record),
1218 );
1219 row.insert(
1220 "content_types".to_string(),
1221 json!(extract_content_types(
1222 message.get("content").unwrap_or(&Value::Null)
1223 )),
1224 );
1225 stamp_common(&mut row);
1226 events.push(Value::Object(row));
1227 if !parent_uuid.is_empty() {
1228 links.push(build_link_row(
1229 ctx,
1230 base_uid,
1231 &parent_uuid,
1232 "parent_uuid",
1233 "{}",
1234 ));
1235 }
1236 }
1237 }
1238 } else {
1239 let event_kind = match top_type {
1240 "progress" => "progress",
1241 "system" => "system",
1242 "summary" => "summary",
1243 "queue-operation" => "queue_operation",
1244 "file-history-snapshot" => "file_history_snapshot",
1245 _ => "unknown",
1246 };
1247
1248 let payload_type = if top_type == "progress" {
1249 to_str(record.get("data").and_then(|d| d.get("type")))
1250 } else if top_type == "system" {
1251 to_str(record.get("subtype"))
1252 } else {
1253 top_type.to_string()
1254 };
1255
1256 let mut row = base_event_obj(
1257 ctx,
1258 base_uid,
1259 event_kind,
1260 if payload_type.is_empty() {
1261 top_type
1262 } else {
1263 payload_type.as_str()
1264 },
1265 "system",
1266 &extract_message_text(record),
1267 &compact_json(record),
1268 );
1269 row.insert("op_kind".to_string(), json!(payload_type));
1270 row.insert("op_status".to_string(), json!(to_str(record.get("status"))));
1271 row.insert(
1272 "latency_ms".to_string(),
1273 json!(to_u32(record.get("durationMs"))),
1274 );
1275 row.insert(
1276 "retry_count".to_string(),
1277 json!(to_u16(record.get("retryAttempt"))),
1278 );
1279 stamp_common(&mut row);
1280 events.push(Value::Object(row));
1281
1282 if !parent_uuid.is_empty() {
1283 links.push(build_link_row(
1284 ctx,
1285 base_uid,
1286 &parent_uuid,
1287 "parent_uuid",
1288 "{}",
1289 ));
1290 }
1291 }
1292
1293 if !events.is_empty() {
1294 let tool_use_id = to_str(record.get("toolUseID"));
1295 if !tool_use_id.is_empty() {
1296 if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
1297 links.push(build_link_row(ctx, uid, &tool_use_id, "tool_use_id", "{}"));
1298 }
1299 }
1300
1301 let source_tool_assistant = to_str(record.get("sourceToolAssistantUUID"));
1302 if !source_tool_assistant.is_empty() {
1303 if let Some(uid) = events[0].get("event_uid").and_then(|v| v.as_str()) {
1304 links.push(build_link_row(
1305 ctx,
1306 uid,
1307 &source_tool_assistant,
1308 "source_tool_assistant",
1309 "{}",
1310 ));
1311 }
1312 }
1313 }
1314
1315 (events, links, tools)
1316 }
1317
1318 pub fn normalize_record(
1319 record: &Value,
1320 source_name: &str,
1321 provider: &str,
1322 source_file: &str,
1323 source_inode: u64,
1324 source_generation: u32,
1325 source_line_no: u64,
1326 source_offset: u64,
1327 session_hint: &str,
1328 model_hint: &str,
1329 ) -> NormalizedRecord {
1330 let record_ts = to_str(record.get("timestamp"));
1331 let event_ts = parse_event_ts(&record_ts);
1332 let top_type = to_str(record.get("type"));
1333
1334 let mut session_id = if provider == "claude" {
1335 to_str(record.get("sessionId"))
1336 } else {
1337 String::new()
1338 };
1339 if session_id.is_empty() {
1340 session_id = if session_hint.is_empty() {
1341 infer_session_id_from_file(source_file)
1342 } else {
1343 session_hint.to_string()
1344 };
1345 }
1346
1347 if provider == "codex" && top_type == "session_meta" {
1348 let payload = record.get("payload").cloned().unwrap_or(Value::Null);
1349 let payload_id = to_str(payload.get("id"));
1350 if !payload_id.is_empty() {
1351 session_id = payload_id;
1352 }
1353 }
1354
1355 let session_date = infer_session_date_from_file(source_file, &record_ts);
1356
1357 let raw_json = compact_json(record);
1358 let base_uid = event_uid(
1359 source_file,
1360 source_generation,
1361 source_line_no,
1362 source_offset,
1363 &raw_json,
1364 "raw",
1365 );
1366
1367 let raw_row = json!({
1368 "source_name": source_name,
1369 "provider": provider,
1370 "source_file": source_file,
1371 "source_inode": source_inode,
1372 "source_generation": source_generation,
1373 "source_line_no": source_line_no,
1374 "source_offset": source_offset,
1375 "record_ts": record_ts,
1376 "top_type": top_type,
1377 "session_id": session_id,
1378 "raw_json": raw_json,
1379 "raw_json_hash": raw_hash(&raw_json),
1380 "event_uid": base_uid,
1381 });
1382
1383 let ctx = RecordContext {
1384 source_name,
1385 provider,
1386 session_id: &session_id,
1387 session_date: &session_date,
1388 source_file,
1389 source_inode,
1390 source_generation,
1391 source_line_no,
1392 source_offset,
1393 record_ts: &record_ts,
1394 event_ts: &event_ts,
1395 };
1396
1397 let (event_rows, link_rows, tool_rows) = if provider == "claude" {
1398 normalize_claude_event(record, &ctx, &top_type, &base_uid)
1399 } else {
1400 normalize_codex_event(record, &ctx, &top_type, &base_uid, model_hint)
1401 };
1402 let model_hint = resolve_model_hint(&event_rows, provider, model_hint);
1403
1404 NormalizedRecord {
1405 raw_row,
1406 event_rows,
1407 link_rows,
1408 tool_rows,
1409 session_hint: session_id,
1410 model_hint,
1411 }
1412 }
1413
1414 #[cfg(test)]
1415 mod tests {
1416 use super::normalize_record;
1417 use serde_json::json;
1418
1419 #[test]
1420 fn codex_tool_call_normalization() {
1421 let record = json!({
1422 "timestamp": "2026-02-14T02:28:00.000Z",
1423 "type": "response_item",
1424 "payload": {
1425 "type": "function_call",
1426 "call_id": "call_123",
1427 "name": "Read",
1428 "arguments": "{\"path\":\"README.md\"}"
1429 }
1430 });
1431
1432 let out = normalize_record(
1433 &record,
1434 "codex",
1435 "codex",
1436 "/Users/eric/.codex/sessions/2026/02/13/session-019c59f9-6389-77a1-a0cb-304eecf935b6.jsonl",
1437 123,
1438 1,
1439 42,
1440 1024,
1441 "",
1442 "",
1443 );
1444
1445 assert_eq!(out.event_rows.len(), 1);
1446 assert_eq!(out.tool_rows.len(), 1);
1447 let row = out.event_rows[0].as_object().unwrap();
1448 assert_eq!(
1449 row.get("event_kind").unwrap().as_str().unwrap(),
1450 "tool_call"
1451 );
1452 assert_eq!(row.get("tool_name").unwrap().as_str().unwrap(), "Read");
1453 }
1454
1455 #[test]
1456 fn codex_turn_context_promotes_model_and_turn_id() {
1457 let record = json!({
1458 "timestamp": "2026-02-15T03:50:42.191Z",
1459 "type": "turn_context",
1460 "payload": {
1461 "turn_id": "019c5f6a-49bd-7920-ac67-1dd8e33b0e95",
1462 "model": "gpt-5.3-codex"
1463 }
1464 });
1465
1466 let out = normalize_record(
1467 &record,
1468 "codex",
1469 "codex",
1470 "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
1471 1,
1472 1,
1473 1,
1474 1,
1475 "",
1476 "",
1477 );
1478
1479 let row = out.event_rows[0].as_object().unwrap();
1480 assert_eq!(row.get("payload_type").unwrap().as_str().unwrap(), "turn_context");
1481 assert_eq!(row.get("model").unwrap().as_str().unwrap(), "gpt-5.3-codex");
1482 assert_eq!(
1483 row.get("request_id").unwrap().as_str().unwrap(),
1484 "019c5f6a-49bd-7920-ac67-1dd8e33b0e95"
1485 );
1486 assert_eq!(
1487 row.get("item_id").unwrap().as_str().unwrap(),
1488 "019c5f6a-49bd-7920-ac67-1dd8e33b0e95"
1489 );
1490 }
1491
1492 #[test]
1493 fn codex_token_count_promotes_usage_fields() {
1494 let record = json!({
1495 "timestamp": "2026-02-15T03:50:50.838Z",
1496 "type": "event_msg",
1497 "payload": {
1498 "type": "token_count",
1499 "info": {
1500 "last_token_usage": {
1501 "input_tokens": 65323,
1502 "output_tokens": 445,
1503 "cached_input_tokens": 58624
1504 }
1505 },
1506 "rate_limits": {
1507 "limit_name": "GPT-5.3-Codex-Spark",
1508 "limit_id": "codex_bengalfox",
1509 "plan_type": "pro"
1510 }
1511 }
1512 });
1513
1514 let out = normalize_record(
1515 &record,
1516 "codex",
1517 "codex",
1518 "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
1519 1,
1520 1,
1521 2,
1522 2,
1523 "",
1524 "",
1525 );
1526
1527 let row = out.event_rows[0].as_object().unwrap();
1528 assert_eq!(row.get("payload_type").unwrap().as_str().unwrap(), "token_count");
1529 assert_eq!(row.get("input_tokens").unwrap().as_u64().unwrap(), 65323);
1530 assert_eq!(row.get("output_tokens").unwrap().as_u64().unwrap(), 445);
1531 assert_eq!(row.get("cache_read_tokens").unwrap().as_u64().unwrap(), 58624);
1532 assert_eq!(
1533 row.get("model").unwrap().as_str().unwrap(),
1534 "gpt-5.3-codex-spark"
1535 );
1536 assert_eq!(row.get("service_tier").unwrap().as_str().unwrap(), "pro");
1537 assert!(
1538 !row.get("token_usage_json")
1539 .unwrap()
1540 .as_str()
1541 .unwrap()
1542 .is_empty()
1543 );
1544 }
1545
1546 #[test]
1547 fn codex_token_count_alias_codex_maps_to_xhigh() {
1548 let record = json!({
1549 "timestamp": "2026-02-15T04:52:55.538Z",
1550 "type": "event_msg",
1551 "payload": {
1552 "type": "token_count",
1553 "info": {
1554 "last_token_usage": {
1555 "input_tokens": 72636,
1556 "output_tokens": 285,
1557 "cached_input_tokens": 70784
1558 }
1559 },
1560 "rate_limits": {
1561 "limit_id": "codex",
1562 "limit_name": null,
1563 "plan_type": "pro"
1564 }
1565 }
1566 });
1567
1568 let out = normalize_record(
1569 &record,
1570 "codex",
1571 "codex",
1572 "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
1573 1,
1574 1,
1575 4,
1576 4,
1577 "",
1578 "",
1579 );
1580
1581 let row = out.event_rows[0].as_object().unwrap();
1582 assert_eq!(
1583 row.get("model").unwrap().as_str().unwrap(),
1584 "gpt-5.3-codex-xhigh"
1585 );
1586 }
1587
1588 #[test]
1589 fn codex_custom_tool_call_promotes_tool_fields() {
1590 let record = json!({
1591 "timestamp": "2026-02-15T03:50:50.838Z",
1592 "type": "response_item",
1593 "payload": {
1594 "type": "custom_tool_call",
1595 "call_id": "call_abc",
1596 "name": "apply_patch",
1597 "status": "completed",
1598 "input": "*** Begin Patch\n*** End Patch\n"
1599 }
1600 });
1601
1602 let out = normalize_record(
1603 &record,
1604 "codex",
1605 "codex",
1606 "/Users/eric/.codex/sessions/2026/02/15/session-019c5f6a-49bd-7920-ac67-1dd8e33b0e95.jsonl",
1607 1,
1608 1,
1609 3,
1610 3,
1611 "",
1612 "",
1613 );
1614
1615 assert_eq!(out.event_rows.len(), 1);
1616 assert_eq!(out.tool_rows.len(), 1);
1617 let row = out.event_rows[0].as_object().unwrap();
1618 assert_eq!(
1619 row.get("event_kind").unwrap().as_str().unwrap(),
1620 "tool_call"
1621 );
1622 assert_eq!(
1623 row.get("tool_call_id").unwrap().as_str().unwrap(),
1624 "call_abc"
1625 );
1626 assert_eq!(
1627 row.get("tool_name").unwrap().as_str().unwrap(),
1628 "apply_patch"
1629 );
1630 assert_eq!(
1631 row.get("op_status").unwrap().as_str().unwrap(),
1632 "completed"
1633 );
1634 }
1635
1636 #[test]
1637 fn claude_tool_use_and_result_blocks() {
1638 let record = json!({
1639 "type": "assistant",
1640 "sessionId": "7c666c01-d38e-4658-8650-854ffb5b626e",
1641 "uuid": "assistant-1",
1642 "parentUuid": "user-1",
1643 "requestId": "req-1",
1644 "timestamp": "2026-01-19T15:58:41.421Z",
1645 "message": {
1646 "model": "claude-opus-4-5-20251101",
1647 "role": "assistant",
1648 "usage": {
1649 "input_tokens": 9,
1650 "output_tokens": 5,
1651 "cache_creation_input_tokens": 19630,
1652 "cache_read_input_tokens": 0,
1653 "service_tier": "standard"
1654 },
1655 "content": [
1656 {
1657 "type": "tool_use",
1658 "id": "toolu_1",
1659 "name": "WebFetch",
1660 "input": {"url": "https://example.com"}
1661 },
1662 {
1663 "type": "text",
1664 "text": "done"
1665 }
1666 ]
1667 }
1668 });
1669
1670 let out = normalize_record(
1671 &record,
1672 "claude",
1673 "claude",
1674 "/Users/eric/.claude/projects/p1/s1.jsonl",
1675 55,
1676 2,
1677 10,
1678 100,
1679 "",
1680 "",
1681 );
1682
1683 assert_eq!(out.event_rows.len(), 2);
1684 assert_eq!(out.tool_rows.len(), 1);
1685
1686 let first = out.event_rows[0].as_object().unwrap();
1687 assert_eq!(
1688 first.get("event_kind").unwrap().as_str().unwrap(),
1689 "tool_call"
1690 );
1691 assert_eq!(first.get("provider").unwrap().as_str().unwrap(), "claude");
1692 }
1693 }