crates/moraine-conversations/src/clickhouse_repo.rs
1 use std::sync::{Arc, OnceLock};
2 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
3
4 use ahash::AHashMap as HashMap;
5 use anyhow::Result as AnyResult;
6 use async_trait::async_trait;
7 use moraine_clickhouse::ClickHouseClient;
8 use regex::Regex;
9 use serde::Deserialize;
10 use serde_json::{json, Value};
11 use tokio::sync::RwLock;
12 use tracing::warn;
13 use uuid::Uuid;
14
15 use crate::cursor::{decode_cursor, encode_cursor, ConversationCursor, TurnCursor};
16 use crate::domain::{
17 Conversation, ConversationDetailOptions, ConversationListFilter, ConversationMode,
18 ConversationSearchHit, ConversationSearchQuery, ConversationSearchResults,
19 ConversationSearchStats, ConversationSummary, OpenContext, OpenEvent, OpenEventRequest, Page,
20 PageRequest, RepoConfig, SearchEventHit, SearchEventKind, SearchEventsQuery,
21 SearchEventsResult, SearchEventsStats, SearchEventsStrategy, TraceEvent, Turn, TurnListFilter,
22 TurnSummary,
23 };
24 use crate::error::{RepoError, RepoResult};
25 use crate::repo::ConversationRepository;
26
27 #[derive(Clone)]
28 pub struct ClickHouseConversationRepository {
29 ch: ClickHouseClient,
30 cfg: RepoConfig,
31 stats_cache: Arc<RwLock<SearchStatsCache>>,
32 search_cache: Arc<RwLock<HashMap<String, SearchEventsCacheEntry>>>,
33 term_postings_cache: Arc<RwLock<HashMap<String, TermPostingsCacheEntry>>>,
34 search_doc_extra_cache: Arc<RwLock<HashMap<String, SearchDocExtraCacheEntry>>>,
35 }
36
37 const BENCHMARK_REPLAY_SOURCE: &str = "benchmark-replay";
38 const CORPUS_STATS_CACHE_TTL: Duration = Duration::from_secs(30);
39 const TERM_DF_CACHE_TTL: Duration = Duration::from_secs(300);
40 const SEARCH_SCHEMA_CACHE_TTL: Duration = Duration::from_secs(60);
41 const SEARCH_RESULT_CACHE_TTL: Duration = Duration::from_secs(15);
42 const SEARCH_RESULT_CACHE_MAX_ENTRIES: usize = 256;
43 const TERM_POSTINGS_CACHE_TTL: Duration = Duration::from_secs(15);
44 const TERM_POSTINGS_CACHE_MAX_ENTRIES: usize = 2048;
45 const TERM_POSTINGS_CACHE_MAX_ROWS_PER_TERM: usize = 131_072;
46 const TERM_POSTINGS_CACHE_MAX_ROWS_TOTAL: usize = 262_144;
47 const SEARCH_DOC_EXTRA_CACHE_TTL: Duration = Duration::from_secs(15);
48 const SEARCH_DOC_EXTRA_CACHE_MAX_ENTRIES: usize = 65536;
49 const CONVERSATION_CANDIDATE_MIN: usize = 512;
50 const CONVERSATION_CANDIDATE_MULTIPLIER: usize = 80;
51 const CONVERSATION_CANDIDATE_MAX: usize = 20_000;
52 const CONVERSATION_RECENT_WINDOW_MS: i64 = 45_000;
53 const CONVERSATION_RECENT_CANDIDATE_LIMIT: usize = 1024;
54
55 #[derive(Debug, Clone, Deserialize)]
56 struct ConversationSummaryRow {
57 session_id: String,
58 first_event_time: String,
59 first_event_unix_ms: i64,
60 last_event_time: String,
61 last_event_unix_ms: i64,
62 total_turns: u32,
63 total_events: u64,
64 user_messages: u64,
65 assistant_messages: u64,
66 tool_calls: u64,
67 tool_results: u64,
68 mode: String,
69 }
70
71 #[derive(Debug, Clone, Deserialize)]
72 struct TurnSummaryRow {
73 session_id: String,
74 turn_seq: u32,
75 turn_id: String,
76 started_at: String,
77 started_at_unix_ms: i64,
78 ended_at: String,
79 ended_at_unix_ms: i64,
80 total_events: u64,
81 user_messages: u64,
82 assistant_messages: u64,
83 tool_calls: u64,
84 tool_results: u64,
85 reasoning_items: u64,
86 }
87
88 #[derive(Debug, Deserialize)]
89 struct TraceEventRow {
90 session_id: String,
91 event_uid: String,
92 event_order: u64,
93 turn_seq: u32,
94 event_time: String,
95 actor_role: String,
96 event_class: String,
97 payload_type: String,
98 call_id: String,
99 name: String,
100 phase: String,
101 item_id: String,
102 source_ref: String,
103 text_content: String,
104 payload_json: String,
105 token_usage_json: String,
106 }
107
108 #[derive(Debug, Deserialize)]
109 struct OpenTargetRow {
110 session_id: String,
111 event_order: u64,
112 turn_seq: u32,
113 }
114
115 #[derive(Debug, Clone, Deserialize)]
116 struct SearchRow {
117 event_uid: String,
118 session_id: String,
119 source_name: String,
120 provider: String,
121 event_class: String,
122 payload_type: String,
123 actor_role: String,
124 name: String,
125 phase: String,
126 source_ref: String,
127 doc_len: u32,
128 text_preview: String,
129 score: f64,
130 matched_terms: u64,
131 }
132
133 #[derive(Debug, Clone, Deserialize)]
134 struct CachedPostingRow {
135 event_uid: String,
136 doc_len: u32,
137 tf: u16,
138 }
139
140 #[derive(Debug, Clone, Deserialize)]
141 struct FetchedPostingRow {
142 term: String,
143 event_uid: String,
144 doc_len: u32,
145 tf: u16,
146 }
147
148 #[derive(Debug, Clone, Deserialize)]
149 struct SearchDocExtraRow {
150 event_uid: String,
151 session_id: String,
152 source_name: String,
153 provider: String,
154 event_class: String,
155 payload_type: String,
156 actor_role: String,
157 name: String,
158 phase: String,
159 source_ref: String,
160 doc_len: u32,
161 text_preview: String,
162 has_codex_mcp: u8,
163 }
164
165 #[derive(Debug, Deserialize)]
166 struct CorpusStatsRow {
167 docs: u64,
168 total_doc_len: u64,
169 }
170
171 #[derive(Debug, Deserialize)]
172 struct DfRow {
173 term: String,
174 df: u64,
175 }
176
177 #[derive(Debug, Deserialize)]
178 struct HotQueryRow {
179 raw_query: String,
180 }
181
182 #[derive(Debug, Deserialize)]
183 struct ConversationSearchRow {
184 session_id: String,
185 #[serde(default)]
186 first_event_time: String,
187 #[serde(default)]
188 first_event_unix_ms: i64,
189 #[serde(default)]
190 last_event_time: String,
191 #[serde(default)]
192 last_event_unix_ms: i64,
193 #[serde(default)]
194 provider: String,
195 score: f64,
196 matched_terms: u16,
197 event_count_considered: u32,
198 best_event_uid: String,
199 #[serde(default)]
200 snippet: String,
201 }
202
203 #[derive(Debug, Deserialize)]
204 struct ConversationSessionMetadataRow {
205 session_id: String,
206 #[serde(default)]
207 provider: String,
208 #[serde(default)]
209 session_slug: String,
210 #[serde(default)]
211 session_summary: String,
212 }
213
214 #[derive(Debug, Deserialize)]
215 struct ConversationSnippetRow {
216 event_uid: String,
217 snippet: String,
218 }
219
220 #[derive(Debug, Deserialize)]
221 struct SessionTimeBoundsRow {
222 session_id: String,
223 first_event_time: String,
224 last_event_time: String,
225 }
226
227 #[derive(Debug, Deserialize)]
228 struct ConversationCandidateRow {
229 session_id: String,
230 score: f64,
231 matched_terms: u16,
232 }
233
234 #[derive(Debug, Default)]
235 struct ConversationCandidateSet {
236 rows: Vec<ConversationCandidateRow>,
237 truncated: bool,
238 }
239
240 #[derive(Debug, Clone, Default)]
241 struct SessionTimeBounds {
242 first_event_time: String,
243 last_event_time: String,
244 }
245
246 #[derive(Debug, Deserialize)]
247 struct ColumnExistsRow {
248 exists: u8,
249 }
250
251 #[derive(Debug, Clone)]
252 struct CorpusStatsCacheEntry {
253 docs: u64,
254 total_doc_len: u64,
255 fetched_at: Instant,
256 }
257
258 #[derive(Debug, Clone)]
259 struct TermDfCacheEntry {
260 df: u64,
261 fetched_at: Instant,
262 }
263
264 #[derive(Debug, Default)]
265 struct SearchStatsCache {
266 corpus_stats: Option<CorpusStatsCacheEntry>,
267 term_df_by_term: HashMap<String, TermDfCacheEntry>,
268 has_codex_flag_column: Option<(bool, Instant)>,
269 }
270
271 #[derive(Debug, Clone)]
272 struct SearchEventsCacheEntry {
273 hits: Vec<SearchEventHit>,
274 fetched_at: Instant,
275 }
276
277 #[derive(Debug, Clone)]
278 struct TermPostingsCacheEntry {
279 rows: Arc<[CachedPostingRow]>,
280 fetched_at: Instant,
281 }
282
283 #[derive(Debug, Clone)]
284 struct SearchDocExtraCacheEntry {
285 session_id: String,
286 source_name: String,
287 provider: String,
288 event_class: String,
289 payload_type: String,
290 actor_role: String,
291 name: String,
292 phase: String,
293 source_ref: String,
294 doc_len: u32,
295 text_preview: String,
296 has_codex_mcp: u8,
297 fetched_at: Instant,
298 }
299
300 #[derive(Debug, Clone, Copy)]
301 struct SearchScoreAccum<'a> {
302 row: &'a CachedPostingRow,
303 score: f64,
304 matched_mask: u64,
305 }
306
307 impl ClickHouseConversationRepository {
308 pub fn new(ch: ClickHouseClient, cfg: RepoConfig) -> Self {
309 Self {
310 ch,
311 cfg,
312 stats_cache: Arc::new(RwLock::new(SearchStatsCache::default())),
313 search_cache: Arc::new(RwLock::new(HashMap::new())),
314 term_postings_cache: Arc::new(RwLock::new(HashMap::new())),
315 search_doc_extra_cache: Arc::new(RwLock::new(HashMap::new())),
316 }
317 }
318
319 pub fn config(&self) -> &RepoConfig {
320 &self.cfg
321 }
322
323 async fn run_mcp_search_prewarm_queries(
324 &self,
325 queries: impl IntoIterator<Item = String>,
326 limit: u16,
327 ) {
328 for query in queries {
329 if query.trim().is_empty() {
330 continue;
331 }
332 if let Err(err) = self
333 .search_events(SearchEventsQuery {
334 query,
335 source: Some(BENCHMARK_REPLAY_SOURCE.to_string()),
336 limit: Some(limit),
337 session_id: None,
338 min_score: None,
339 min_should_match: None,
340 include_tool_events: None,
341 event_kinds: None,
342 exclude_codex_mcp: None,
343 disable_cache: Some(false),
344 search_strategy: Some(SearchEventsStrategy::Optimized),
345 })
346 .await
347 {
348 warn!("mcp prewarm query failed: {}", err);
349 }
350 }
351 }
352
353 pub async fn prewarm_mcp_search_state_quick(&self) -> RepoResult<()> {
354 // Keep synchronous initialize prewarm deterministic and bounded.
355 // Variable hot-query prewarm stays in the async background path.
356 const PREWARM_QUERY: &str = "the";
357 const PREWARM_LIMITS: [u16; 2] = [1, 25];
358
359 for limit in PREWARM_LIMITS {
360 if let Err(err) = self
361 .search_events(SearchEventsQuery {
362 query: PREWARM_QUERY.to_string(),
363 source: Some(BENCHMARK_REPLAY_SOURCE.to_string()),
364 limit: Some(limit),
365 session_id: None,
366 min_score: None,
367 min_should_match: None,
368 include_tool_events: None,
369 event_kinds: None,
370 exclude_codex_mcp: None,
371 disable_cache: Some(false),
372 search_strategy: Some(SearchEventsStrategy::Optimized),
373 })
374 .await
375 {
376 warn!("mcp quick prewarm query failed: {}", err);
377 }
378 }
379
380 Ok(())
381 }
382
383 pub async fn prewarm_mcp_search_state(&self) -> RepoResult<()> {
384 const PREWARM_QUERY_LIMIT: u16 = 10;
385 const PREWARM_HOT_QUERY_COUNT: usize = 6;
386 const PREWARM_FALLBACK_QUERIES: [&str; 5] = [
387 "the",
388 "error",
389 "test",
390 "file directory path config",
391 "function code implementation",
392 ];
393
394 let mut queries = self
395 .load_hot_queries_for_prewarm(PREWARM_HOT_QUERY_COUNT)
396 .await?;
397 for fallback in PREWARM_FALLBACK_QUERIES {
398 if !queries.iter().any(|existing| existing == fallback) {
399 queries.push(fallback.to_string());
400 }
401 }
402
403 self.run_mcp_search_prewarm_queries(queries, PREWARM_QUERY_LIMIT)
404 .await;
405
406 Ok(())
407 }
408
409 fn table_ref(&self, table: &str) -> String {
410 format!(
411 "{}.{}",
412 sql_identifier(&self.ch.config().database),
413 sql_identifier(table)
414 )
415 }
416
417 fn map_backend<T>(&self, result: AnyResult<T>) -> RepoResult<T> {
418 result.map_err(|err| RepoError::backend(err.to_string()))
419 }
420
421 fn search_events_cache_key(
422 terms: &[String],
423 search_strategy: SearchEventsStrategy,
424 include_tool_events: bool,
425 event_kinds: Option<&[SearchEventKind]>,
426 exclude_codex_mcp: bool,
427 session_id: Option<&str>,
428 min_should_match: u16,
429 min_score: f64,
430 limit: u16,
431 ) -> String {
432 let mut cache_terms = terms.to_vec();
433 cache_terms.sort_unstable();
434 let event_kind_sig = event_kinds
435 .map(|kinds| {
436 kinds
437 .iter()
438 .map(|kind| kind.as_str())
439 .collect::<Vec<_>>()
440 .join(",")
441 })
442 .unwrap_or_default();
443 format!(
444 "strategy={};incl_tools={include_tool_events};event_kinds={event_kind_sig};excl_codex={exclude_codex_mcp};session={};msm={min_should_match};min_score={min_score:.12};limit={limit};terms={}",
445 search_strategy.as_str(),
446 session_id.unwrap_or(""),
447 cache_terms.join(",")
448 )
449 }
450
451 async fn search_events_cache_get(&self, key: &str) -> Option<Vec<SearchEventHit>> {
452 let now = Instant::now();
453 {
454 let cache = self.search_cache.read().await;
455 if let Some(entry) = cache.get(key) {
456 if now.duration_since(entry.fetched_at) <= SEARCH_RESULT_CACHE_TTL {
457 return Some(entry.hits.clone());
458 }
459 } else {
460 return None;
461 }
462 }
463
464 let mut cache = self.search_cache.write().await;
465 if let Some(entry) = cache.get(key) {
466 if now.duration_since(entry.fetched_at) <= SEARCH_RESULT_CACHE_TTL {
467 return Some(entry.hits.clone());
468 }
469 }
470 cache.remove(key);
471 None
472 }
473
474 async fn search_events_cache_put(&self, key: String, hits: &[SearchEventHit]) {
475 let now = Instant::now();
476 let mut cache = self.search_cache.write().await;
477 cache.retain(|_, entry| now.duration_since(entry.fetched_at) <= SEARCH_RESULT_CACHE_TTL);
478
479 if cache.len() >= SEARCH_RESULT_CACHE_MAX_ENTRIES {
480 if let Some(oldest_key) = cache
481 .iter()
482 .min_by_key(|(_, entry)| entry.fetched_at)
483 .map(|(k, _)| k.clone())
484 {
485 cache.remove(&oldest_key);
486 }
487 }
488
489 cache.insert(
490 key,
491 SearchEventsCacheEntry {
492 hits: hits.to_vec(),
493 fetched_at: now,
494 },
495 );
496 }
497
498 fn mode_subquery(&self) -> String {
499 let events_table = self.table_ref("events");
500 format!(
501 "SELECT
502 session_id,
503 multiIf(
504 countIf(
505 payload_type = 'web_search_call'
506 OR payload_type = 'search_results_received'
507 OR (payload_type = 'tool_use' AND tool_name IN ('WebSearch', 'WebFetch'))
508 ) > 0,
509 'web_search',
510 countIf(source_name = 'codex-mcp' OR lowerUTF8(tool_name) IN ('search', 'open')) > 0,
511 'mcp_internal',
512 countIf(event_kind IN ('tool_call', 'tool_result') OR payload_type = 'tool_use') > 0,
513 'tool_calling',
514 'chat'
515 ) AS mode
516 FROM {events_table}
517 GROUP BY session_id"
518 )
519 }
520
521 fn parse_mode(raw: &str) -> ConversationMode {
522 match raw {
523 "web_search" => ConversationMode::WebSearch,
524 "mcp_internal" => ConversationMode::McpInternal,
525 "tool_calling" => ConversationMode::ToolCalling,
526 _ => ConversationMode::Chat,
527 }
528 }
529
530 fn conversation_filter_sig(filter: &ConversationListFilter) -> String {
531 format!(
532 "from={:?};to={:?};mode={}",
533 filter.from_unix_ms,
534 filter.to_unix_ms,
535 filter
536 .mode
537 .map(ConversationMode::as_str)
538 .unwrap_or("__none__")
539 )
540 }
541
542 fn turn_filter_sig(session_id: &str, filter: &TurnListFilter) -> String {
543 format!(
544 "session={};from={:?};to={:?}",
545 session_id, filter.from_turn_seq, filter.to_turn_seq
546 )
547 }
548
549 fn validate_time_bounds(from_unix_ms: Option<i64>, to_unix_ms: Option<i64>) -> RepoResult<()> {
550 if let (Some(from), Some(to)) = (from_unix_ms, to_unix_ms) {
551 if from >= to {
552 return Err(RepoError::invalid_argument(
553 "from_unix_ms must be strictly less than to_unix_ms",
554 ));
555 }
556 }
557 Ok(())
558 }
559
560 fn validate_session_id(session_id: &str) -> RepoResult<()> {
561 if !is_safe_filter_value(session_id) {
562 return Err(RepoError::invalid_argument(
563 "session_id contains unsupported characters",
564 ));
565 }
566 Ok(())
567 }
568
569 fn validate_event_uid(event_uid: &str) -> RepoResult<()> {
570 if !is_safe_filter_value(event_uid) {
571 return Err(RepoError::invalid_argument(
572 "event_uid contains unsupported characters",
573 ));
574 }
575 Ok(())
576 }
577
578 fn normalize_event_kinds(
579 event_kinds: Option<Vec<SearchEventKind>>,
580 ) -> RepoResult<Option<Vec<SearchEventKind>>> {
581 let Some(mut kinds) = event_kinds else {
582 return Ok(None);
583 };
584
585 if kinds.is_empty() {
586 return Err(RepoError::invalid_argument(
587 "event_kind filter cannot be an empty list",
588 ));
589 }
590
591 kinds.sort_unstable();
592 kinds.dedup();
593 Ok(Some(kinds))
594 }
595
596 fn matches_event_kind(event_class: &str, payload_type: &str, kind: SearchEventKind) -> bool {
597 match kind {
598 SearchEventKind::Message => {
599 event_class == "message"
600 || (event_class == "event_msg"
601 && matches!(
602 payload_type,
603 "user_message" | "agent_message" | "message" | "text"
604 ))
605 }
606 SearchEventKind::Reasoning => {
607 event_class == "reasoning"
608 || (event_class == "event_msg" && payload_type == "agent_reasoning")
609 }
610 SearchEventKind::ToolCall => {
611 event_class == "tool_call"
612 || (event_class == "event_msg"
613 && matches!(
614 payload_type,
615 "tool_use" | "function_call" | "custom_tool_call" | "web_search_call"
616 ))
617 }
618 SearchEventKind::ToolResult => {
619 event_class == "tool_result"
620 || (event_class == "event_msg"
621 && matches!(
622 payload_type,
623 "tool_result"
624 | "function_call_output"
625 | "custom_tool_call_output"
626 | "search_results_received"
627 ))
628 }
629 }
630 }
631
632 fn matches_requested_event_kinds(
633 event_class: &str,
634 payload_type: &str,
635 event_kinds: &[SearchEventKind],
636 ) -> bool {
637 event_kinds
638 .iter()
639 .any(|kind| Self::matches_event_kind(event_class, payload_type, *kind))
640 }
641
642 fn single_event_kind_clause(
643 event_class_expr: &str,
644 payload_type_expr: &str,
645 kind: SearchEventKind,
646 ) -> String {
647 match kind {
648 SearchEventKind::Message => format!(
649 "({event_class_expr} = 'message' OR ({event_class_expr} = 'event_msg' AND {payload_type_expr} IN ('user_message', 'agent_message', 'message', 'text')))"
650 ),
651 SearchEventKind::Reasoning => format!(
652 "({event_class_expr} = 'reasoning' OR ({event_class_expr} = 'event_msg' AND {payload_type_expr} = 'agent_reasoning'))"
653 ),
654 SearchEventKind::ToolCall => format!(
655 "({event_class_expr} = 'tool_call' OR ({event_class_expr} = 'event_msg' AND {payload_type_expr} IN ('tool_use', 'function_call', 'custom_tool_call', 'web_search_call')))"
656 ),
657 SearchEventKind::ToolResult => format!(
658 "({event_class_expr} = 'tool_result' OR ({event_class_expr} = 'event_msg' AND {payload_type_expr} IN ('tool_result', 'function_call_output', 'custom_tool_call_output', 'search_results_received')))"
659 ),
660 }
661 }
662
663 fn event_kind_filter_clause(
664 event_class_expr: &str,
665 payload_type_expr: &str,
666 event_kinds: &[SearchEventKind],
667 ) -> String {
668 let clauses = event_kinds
669 .iter()
670 .map(|kind| Self::single_event_kind_clause(event_class_expr, payload_type_expr, *kind))
671 .collect::<Vec<_>>();
672 if clauses.len() == 1 {
673 clauses[0].clone()
674 } else {
675 format!("({})", clauses.join(" OR "))
676 }
677 }
678
679 fn is_low_information_system_event(actor_role: &str, payload_type: &str) -> bool {
680 actor_role.eq_ignore_ascii_case("system")
681 && matches!(
682 payload_type.to_ascii_lowercase().as_str(),
683 "progress" | "file_history_snapshot" | "system"
684 )
685 }
686
687 fn open_context_filter_clause(include_system_events: bool) -> &'static str {
688 if include_system_events {
689 ""
690 } else {
691 " AND NOT (lowerUTF8(actor_role) = 'system' AND lowerUTF8(payload_type) IN ('progress', 'file_history_snapshot', 'system'))"
692 }
693 }
694
695 fn map_conversation_row(row: ConversationSummaryRow) -> ConversationSummary {
696 ConversationSummary {
697 session_id: row.session_id,
698 first_event_time: row.first_event_time,
699 first_event_unix_ms: row.first_event_unix_ms,
700 last_event_time: row.last_event_time,
701 last_event_unix_ms: row.last_event_unix_ms,
702 total_turns: row.total_turns,
703 total_events: row.total_events,
704 user_messages: row.user_messages,
705 assistant_messages: row.assistant_messages,
706 tool_calls: row.tool_calls,
707 tool_results: row.tool_results,
708 mode: Self::parse_mode(&row.mode),
709 }
710 }
711
712 fn map_turn_row(row: TurnSummaryRow) -> TurnSummary {
713 TurnSummary {
714 session_id: row.session_id,
715 turn_seq: row.turn_seq,
716 turn_id: row.turn_id,
717 started_at: row.started_at,
718 started_at_unix_ms: row.started_at_unix_ms,
719 ended_at: row.ended_at,
720 ended_at_unix_ms: row.ended_at_unix_ms,
721 total_events: row.total_events,
722 user_messages: row.user_messages,
723 assistant_messages: row.assistant_messages,
724 tool_calls: row.tool_calls,
725 tool_results: row.tool_results,
726 reasoning_items: row.reasoning_items,
727 }
728 }
729
730 fn map_trace_event(row: TraceEventRow) -> TraceEvent {
731 TraceEvent {
732 session_id: row.session_id,
733 event_uid: row.event_uid,
734 event_order: row.event_order,
735 turn_seq: row.turn_seq,
736 event_time: row.event_time,
737 actor_role: row.actor_role,
738 event_class: row.event_class,
739 payload_type: row.payload_type,
740 call_id: row.call_id,
741 name: row.name,
742 phase: row.phase,
743 item_id: row.item_id,
744 source_ref: row.source_ref,
745 text_content: row.text_content,
746 payload_json: row.payload_json,
747 token_usage_json: row.token_usage_json,
748 }
749 }
750
751 fn mode_filter_clause(mode: Option<ConversationMode>) -> Option<String> {
752 mode.map(|m| format!("ifNull(m.mode, 'chat') = {}", sql_quote(m.as_str())))
753 }
754
755 async fn load_turns_for_session(&self, session_id: &str) -> RepoResult<Vec<TurnSummary>> {
756 let turn_summary = self.table_ref("v_turn_summary");
757 let query = format!(
758 "SELECT
759 session_id,
760 toUInt32(turn_seq) AS turn_seq,
761 ifNull(turn_id, '') AS turn_id,
762 toString(started_at) AS started_at,
763 toInt64(toUnixTimestamp64Milli(started_at)) AS started_at_unix_ms,
764 toString(ended_at) AS ended_at,
765 toInt64(toUnixTimestamp64Milli(ended_at)) AS ended_at_unix_ms,
766 toUInt64(total_events) AS total_events,
767 toUInt64(user_messages) AS user_messages,
768 toUInt64(assistant_messages) AS assistant_messages,
769 toUInt64(tool_calls) AS tool_calls,
770 toUInt64(tool_results) AS tool_results,
771 toUInt64(reasoning_items) AS reasoning_items
772 FROM {turn_summary}
773 WHERE session_id = {}
774 ORDER BY turn_seq ASC
775 FORMAT JSONEachRow",
776 sql_quote(session_id),
777 );
778
779 let rows: Vec<TurnSummaryRow> = self.map_backend(self.ch.query_rows(&query, None).await)?;
780 Ok(rows.into_iter().map(Self::map_turn_row).collect())
781 }
782
783 async fn load_conversation_summary(
784 &self,
785 session_id: &str,
786 ) -> RepoResult<Option<ConversationSummary>> {
787 let session_summary = self.table_ref("v_session_summary");
788 let mode_subquery = self.mode_subquery();
789 let query = format!(
790 "SELECT
791 s.session_id,
792 toString(s.first_event_time) AS first_event_time,
793 toInt64(toUnixTimestamp64Milli(s.first_event_time)) AS first_event_unix_ms,
794 toString(s.last_event_time) AS last_event_time,
795 toInt64(toUnixTimestamp64Milli(s.last_event_time)) AS last_event_unix_ms,
796 toUInt32(s.total_turns) AS total_turns,
797 toUInt64(s.total_events) AS total_events,
798 toUInt64(s.user_messages) AS user_messages,
799 toUInt64(s.assistant_messages) AS assistant_messages,
800 toUInt64(s.tool_calls) AS tool_calls,
801 toUInt64(s.tool_results) AS tool_results,
802 ifNull(m.mode, 'chat') AS mode
803 FROM {session_summary} AS s
804 LEFT JOIN ({mode_subquery}) AS m ON m.session_id = s.session_id
805 WHERE s.session_id = {}
806 LIMIT 1
807 FORMAT JSONEachRow",
808 sql_quote(session_id),
809 );
810
811 let rows: Vec<ConversationSummaryRow> =
812 self.map_backend(self.ch.query_rows(&query, None).await)?;
813 Ok(rows.into_iter().next().map(Self::map_conversation_row))
814 }
815
816 async fn corpus_stats(&self) -> RepoResult<(u64, u64)> {
817 let now = Instant::now();
818 {
819 let cache = self.stats_cache.read().await;
820 if let Some(entry) = cache.corpus_stats.as_ref() {
821 if now.duration_since(entry.fetched_at) <= CORPUS_STATS_CACHE_TTL {
822 return Ok((entry.docs, entry.total_doc_len));
823 }
824 }
825 }
826
827 let from_stats_query = format!(
828 "SELECT toUInt64(ifNull(sum(docs), 0)) AS docs, toUInt64(ifNull(sum(total_doc_len), 0)) AS total_doc_len FROM {} FORMAT JSONEachRow",
829 self.table_ref("search_corpus_stats")
830 );
831
832 let from_stats: Vec<CorpusStatsRow> =
833 self.map_backend(self.ch.query_rows(&from_stats_query, None).await)?;
834
835 if let Some(row) = from_stats.first() {
836 if row.docs > 0 {
837 self.cache_corpus_stats(row.docs, row.total_doc_len, now)
838 .await;
839 return Ok((row.docs, row.total_doc_len));
840 }
841 }
842
843 let fallback_query = format!(
844 "SELECT toUInt64(count()) AS docs, toUInt64(ifNull(sum(doc_len), 0)) AS total_doc_len FROM {} FINAL WHERE doc_len > 0 FORMAT JSONEachRow",
845 self.table_ref("search_documents")
846 );
847 let fallback: Vec<CorpusStatsRow> =
848 self.map_backend(self.ch.query_rows(&fallback_query, None).await)?;
849 let resolved = if let Some(row) = fallback.first() {
850 (row.docs, row.total_doc_len)
851 } else {
852 (0, 0)
853 };
854
855 let mut cache = self.stats_cache.write().await;
856 cache.corpus_stats = Some(CorpusStatsCacheEntry {
857 docs: resolved.0,
858 total_doc_len: resolved.1,
859 fetched_at: now,
860 });
861 Ok(resolved)
862 }
863
864 async fn cache_corpus_stats(&self, docs: u64, total_doc_len: u64, fetched_at: Instant) {
865 let mut cache = self.stats_cache.write().await;
866 cache.corpus_stats = Some(CorpusStatsCacheEntry {
867 docs,
868 total_doc_len,
869 fetched_at,
870 });
871 }
872
873 async fn cache_term_df_values(
874 &self,
875 terms: impl IntoIterator<Item = String>,
876 map: &HashMap<String, u64>,
877 fetched_at: Instant,
878 ) {
879 let mut cache = self.stats_cache.write().await;
880 for term in terms {
881 let df = *map.get(&term).unwrap_or(&0);
882 cache
883 .term_df_by_term
884 .insert(term, TermDfCacheEntry { df, fetched_at });
885 }
886 }
887
888 async fn load_hot_queries_for_prewarm(&self, limit: usize) -> RepoResult<Vec<String>> {
889 let query = format!(
890 "SELECT raw_query
891 FROM (
892 SELECT
893 raw_query,
894 count() AS query_count,
895 avg(response_ms) AS avg_response_ms
896 FROM {}
897 WHERE source = 'moraine-mcp'
898 AND ts >= now() - INTERVAL 7 DAY
899 AND lengthUTF8(trim(BOTH ' ' FROM raw_query)) > 0
900 GROUP BY raw_query
901 ORDER BY query_count DESC, avg_response_ms DESC
902 LIMIT {}
903 )
904 FORMAT JSONEachRow",
905 self.table_ref("search_query_log"),
906 limit
907 );
908 let rows: Vec<HotQueryRow> = self.map_backend(self.ch.query_rows(&query, None).await)?;
909 Ok(rows.into_iter().map(|row| row.raw_query).collect())
910 }
911
912 async fn df_map(&self, terms: &[String]) -> RepoResult<HashMap<String, u64>> {
913 let now = Instant::now();
914 let postings_table = self.table_ref("search_postings");
915
916 let mut map = HashMap::<String, u64>::new();
917 let mut missing_terms = Vec::<String>::new();
918
919 {
920 let cache = self.stats_cache.read().await;
921 for term in terms {
922 if let Some(entry) = cache.term_df_by_term.get(term) {
923 if now.duration_since(entry.fetched_at) <= TERM_DF_CACHE_TTL {
924 map.insert(term.clone(), entry.df);
925 continue;
926 }
927 }
928 missing_terms.push(term.clone());
929 }
930 }
931
932 if missing_terms.is_empty() {
933 return Ok(map);
934 }
935
936 let missing_terms_array = sql_array_strings(&missing_terms);
937 let df_query = format!(
938 "SELECT term, toUInt64(uniqExact(doc_id)) AS df FROM {postings_table} WHERE term IN {missing_terms_array} GROUP BY term FORMAT JSONEachRow",
939 );
940 let rows: Vec<DfRow> = self.map_backend(self.ch.query_rows(&df_query, None).await)?;
941 for row in rows {
942 map.insert(row.term, row.df);
943 }
944
945 for term in &missing_terms {
946 map.entry(term.clone()).or_insert(0);
947 }
948
949 self.cache_term_df_values(missing_terms, &map, now).await;
950 Ok(map)
951 }
952
953 fn build_search_events_sql(
954 &self,
955 terms: &[String],
956 idf_by_term: &HashMap<String, f64>,
957 avgdl: f64,
958 include_tool_events: bool,
959 event_kinds: Option<&[SearchEventKind]>,
960 exclude_codex_mcp: bool,
961 use_document_codex_flag: bool,
962 session_id: Option<&str>,
963 min_should_match: u16,
964 min_score: f64,
965 limit: u16,
966 ) -> RepoResult<String> {
967 if terms.is_empty() {
968 return Err(RepoError::invalid_argument(
969 "cannot build search query with empty terms",
970 ));
971 }
972
973 let postings_table = self.table_ref("search_postings");
974 let documents_table = self.table_ref("search_documents");
975 let terms_array_sql = sql_array_strings(terms);
976 let idf_vals: Vec<f64> = terms
977 .iter()
978 .map(|t| *idf_by_term.get(t).unwrap_or(&0.0))
979 .collect();
980 let idf_array_sql = sql_array_f64(&idf_vals);
981 let documents_join_sql = if use_document_codex_flag {
982 format!(
983 "(SELECT
984 event_uid,
985 any(session_id) AS session_id,
986 any(source_name) AS source_name,
987 any(provider) AS provider,
988 any(event_class) AS event_class,
989 any(payload_type) AS payload_type,
990 any(actor_role) AS actor_role,
991 any(name) AS name,
992 any(phase) AS phase,
993 any(source_ref) AS source_ref,
994 any(doc_len) AS doc_len,
995 any(text_content) AS text_content,
996 any(payload_json) AS payload_json,
997 toUInt8(any(has_codex_mcp)) AS has_codex_mcp
998 FROM {documents_table}
999 GROUP BY event_uid)"
1000 )
1001 } else {
1002 format!(
1003 "(SELECT
1004 event_uid,
1005 any(session_id) AS session_id,
1006 any(source_name) AS source_name,
1007 any(provider) AS provider,
1008 any(event_class) AS event_class,
1009 any(payload_type) AS payload_type,
1010 any(actor_role) AS actor_role,
1011 any(name) AS name,
1012 any(phase) AS phase,
1013 any(source_ref) AS source_ref,
1014 any(doc_len) AS doc_len,
1015 any(text_content) AS text_content,
1016 any(payload_json) AS payload_json,
1017 toUInt8(0) AS has_codex_mcp
1018 FROM {documents_table}
1019 GROUP BY event_uid)"
1020 )
1021 };
1022
1023 let mut where_clauses = vec![format!("p.term IN {}", terms_array_sql)];
1024
1025 if let Some(sid) = session_id {
1026 where_clauses.push(format!("d.session_id = {}", sql_quote(sid)));
1027 }
1028
1029 if let Some(event_kinds) = event_kinds {
1030 where_clauses.push(Self::event_kind_filter_clause(
1031 "d.event_class",
1032 "d.payload_type",
1033 event_kinds,
1034 ));
1035 } else if include_tool_events {
1036 where_clauses.push("d.payload_type != 'token_count'".to_string());
1037 } else {
1038 where_clauses
1039 .push("d.event_class IN ('message', 'reasoning', 'event_msg')".to_string());
1040 where_clauses.push(
1041 "d.payload_type NOT IN ('token_count', 'task_started', 'task_complete', 'turn_aborted', 'item_completed')"
1042 .to_string(),
1043 );
1044 }
1045
1046 if exclude_codex_mcp {
1047 if use_document_codex_flag {
1048 where_clauses.push("toUInt8(d.has_codex_mcp) = 0".to_string());
1049 } else {
1050 where_clauses.push(
1051 "positionCaseInsensitiveUTF8(d.payload_json, 'codex-mcp') = 0".to_string(),
1052 );
1053 }
1054 where_clauses.push("lowerUTF8(d.name) NOT IN ('search', 'open')".to_string());
1055 }
1056
1057 let where_sql = where_clauses.join("\n AND ");
1058 let k1 = self.cfg.bm25_k1.max(0.01);
1059 let b = self.cfg.bm25_b.clamp(0.0, 1.0);
1060
1061 Ok(format!(
1062 "WITH
1063 {k1:.6} AS k1,
1064 {b:.6} AS b,
1065 greatest({avgdl:.6}, 1.0) AS avgdl,
1066 {terms_array_sql} AS q_terms,
1067 {idf_array_sql} AS q_idf
1068 SELECT
1069 p.doc_id AS event_uid,
1070 any(d.session_id) AS session_id,
1071 any(d.source_name) AS source_name,
1072 any(d.provider) AS provider,
1073 any(d.event_class) AS event_class,
1074 any(d.payload_type) AS payload_type,
1075 any(d.actor_role) AS actor_role,
1076 any(d.name) AS name,
1077 any(d.phase) AS phase,
1078 any(d.source_ref) AS source_ref,
1079 any(d.doc_len) AS doc_len,
1080 leftUTF8(any(d.text_content), {preview}) AS text_preview,
1081 sum(
1082 transform(toString(p.term), q_terms, q_idf, 0.0)
1083 *
1084 (
1085 (toFloat64(p.tf) * (k1 + 1.0))
1086 /
1087 (toFloat64(p.tf) + k1 * (1.0 - b + b * (toFloat64(p.doc_len) / avgdl)))
1088 )
1089 ) AS score,
1090 uniqExact(p.term) AS matched_terms
1091 FROM {postings_table} AS p
1092 INNER JOIN {documents_join_sql} AS d ON d.event_uid = p.doc_id
1093 WHERE {where_sql}
1094 GROUP BY p.doc_id
1095 HAVING matched_terms >= {min_should_match} AND score >= {min_score:.6}
1096 ORDER BY score DESC, event_uid ASC
1097 LIMIT {limit}
1098 FORMAT JSONEachRow",
1099 preview = self.cfg.preview_chars,
1100 postings_table = postings_table,
1101 documents_join_sql = documents_join_sql,
1102 ))
1103 }
1104
1105 fn build_search_events_hydrate_sql(
1106 &self,
1107 event_uids: &[String],
1108 use_document_codex_flag: bool,
1109 ) -> RepoResult<String> {
1110 if event_uids.is_empty() {
1111 return Err(RepoError::invalid_argument(
1112 "cannot hydrate search rows for empty event_uids",
1113 ));
1114 }
1115 let documents_table = self.table_ref("search_documents");
1116 let event_uids_array = sql_array_strings(event_uids);
1117 let codex_expr = if use_document_codex_flag {
1118 "d.has_codex_mcp"
1119 } else {
1120 "toUInt8(positionCaseInsensitiveUTF8(d.payload_json, 'codex-mcp') > 0)"
1121 };
1122 let documents_source_sql = if use_document_codex_flag {
1123 format!(
1124 "(SELECT
1125 event_uid,
1126 any(session_id) AS session_id,
1127 any(source_name) AS source_name,
1128 any(provider) AS provider,
1129 any(event_class) AS event_class,
1130 any(payload_type) AS payload_type,
1131 any(actor_role) AS actor_role,
1132 any(name) AS name,
1133 any(phase) AS phase,
1134 any(source_ref) AS source_ref,
1135 any(doc_len) AS doc_len,
1136 any(text_content) AS text_content,
1137 any(payload_json) AS payload_json,
1138 toUInt8(any(has_codex_mcp)) AS has_codex_mcp
1139 FROM {documents_table}
1140 WHERE event_uid IN {event_uids_array}
1141 GROUP BY event_uid)"
1142 )
1143 } else {
1144 format!(
1145 "(SELECT
1146 event_uid,
1147 any(session_id) AS session_id,
1148 any(source_name) AS source_name,
1149 any(provider) AS provider,
1150 any(event_class) AS event_class,
1151 any(payload_type) AS payload_type,
1152 any(actor_role) AS actor_role,
1153 any(name) AS name,
1154 any(phase) AS phase,
1155 any(source_ref) AS source_ref,
1156 any(doc_len) AS doc_len,
1157 any(text_content) AS text_content,
1158 any(payload_json) AS payload_json,
1159 toUInt8(0) AS has_codex_mcp
1160 FROM {documents_table}
1161 WHERE event_uid IN {event_uids_array}
1162 GROUP BY event_uid)"
1163 )
1164 };
1165
1166 Ok(format!(
1167 "SELECT
1168 d.event_uid AS event_uid,
1169 d.session_id AS session_id,
1170 d.source_name AS source_name,
1171 d.provider AS provider,
1172 d.event_class AS event_class,
1173 d.payload_type AS payload_type,
1174 d.actor_role AS actor_role,
1175 d.name AS name,
1176 d.phase AS phase,
1177 d.source_ref AS source_ref,
1178 d.doc_len AS doc_len,
1179 leftUTF8(d.text_content, {preview}) AS text_preview,
1180 {codex_expr} AS has_codex_mcp
1181 FROM {documents_source_sql} AS d
1182 FORMAT JSONEachRow",
1183 preview = self.cfg.preview_chars,
1184 codex_expr = codex_expr,
1185 documents_source_sql = documents_source_sql,
1186 ))
1187 }
1188
1189 async fn search_documents_has_codex_flag(&self) -> RepoResult<bool> {
1190 let now = Instant::now();
1191 {
1192 let cache = self.stats_cache.read().await;
1193 if let Some((value, fetched_at)) = cache.has_codex_flag_column {
1194 if now.duration_since(fetched_at) <= SEARCH_SCHEMA_CACHE_TTL {
1195 return Ok(value);
1196 }
1197 }
1198 }
1199
1200 let query = format!(
1201 "SELECT
1202 toUInt8(count() > 0) AS exists
1203 FROM system.columns
1204 WHERE database = {}
1205 AND table = 'search_documents'
1206 AND name = 'has_codex_mcp'
1207 FORMAT JSONEachRow",
1208 sql_quote(&self.ch.config().database)
1209 );
1210 let rows: Vec<ColumnExistsRow> =
1211 self.map_backend(self.ch.query_rows(&query, None).await)?;
1212 let exists = rows.first().map(|row| row.exists != 0).unwrap_or(false);
1213
1214 let mut cache = self.stats_cache.write().await;
1215 cache.has_codex_flag_column = Some((exists, now));
1216 Ok(exists)
1217 }
1218
1219 fn passes_search_doc_filters(
1220 row: &SearchDocExtraCacheEntry,
1221 include_tool_events: bool,
1222 event_kinds: Option<&[SearchEventKind]>,
1223 exclude_codex_mcp: bool,
1224 session_id: Option<&str>,
1225 ) -> bool {
1226 if let Some(sid) = session_id {
1227 if row.session_id != sid {
1228 return false;
1229 }
1230 }
1231
1232 if let Some(event_kinds) = event_kinds {
1233 if !Self::matches_requested_event_kinds(
1234 &row.event_class,
1235 &row.payload_type,
1236 event_kinds,
1237 ) {
1238 return false;
1239 }
1240 } else if include_tool_events {
1241 if row.payload_type == "token_count" {
1242 return false;
1243 }
1244 } else {
1245 if row.event_class != "message"
1246 && row.event_class != "reasoning"
1247 && row.event_class != "event_msg"
1248 {
1249 return false;
1250 }
1251 if row.payload_type == "token_count"
1252 || row.payload_type == "task_started"
1253 || row.payload_type == "task_complete"
1254 || row.payload_type == "turn_aborted"
1255 || row.payload_type == "item_completed"
1256 {
1257 return false;
1258 }
1259 }
1260
1261 if exclude_codex_mcp {
1262 if row.has_codex_mcp != 0 {
1263 return false;
1264 }
1265 if row.name.eq_ignore_ascii_case("search") || row.name.eq_ignore_ascii_case("open") {
1266 return false;
1267 }
1268 }
1269
1270 true
1271 }
1272
1273 fn bm25_term_score(tf: u16, doc_len: u32, avgdl: f64, k1: f64, b: f64) -> f64 {
1274 let tf = tf as f64;
1275 let norm = tf + k1 * (1.0 - b + b * (doc_len as f64 / avgdl.max(1.0)));
1276 if norm <= 0.0 {
1277 0.0
1278 } else {
1279 tf * (k1 + 1.0) / norm
1280 }
1281 }
1282
1283 fn bm25_idf(docs: u64, df: u64) -> f64 {
1284 let idf = if df == 0 {
1285 (1.0 + ((docs as f64 + 0.5) / 0.5)).ln()
1286 } else {
1287 let n = docs.max(df) as f64;
1288 (1.0 + ((n - df as f64 + 0.5) / (df as f64 + 0.5))).ln()
1289 };
1290 idf.max(0.0)
1291 }
1292
1293 async fn load_term_postings_for_terms(
1294 &self,
1295 terms: &[String],
1296 ) -> RepoResult<HashMap<String, Arc<[CachedPostingRow]>>> {
1297 let now = Instant::now();
1298 let mut by_term = HashMap::<String, Arc<[CachedPostingRow]>>::new();
1299 let mut missing_terms = Vec::<String>::new();
1300
1301 {
1302 let cache = self.term_postings_cache.read().await;
1303 for term in terms {
1304 if let Some(entry) = cache.get(term) {
1305 if now.duration_since(entry.fetched_at) <= TERM_POSTINGS_CACHE_TTL {
1306 by_term.insert(term.clone(), Arc::clone(&entry.rows));
1307 continue;
1308 }
1309 }
1310 missing_terms.push(term.clone());
1311 }
1312 }
1313
1314 if !missing_terms.is_empty() {
1315 let postings_table = self.table_ref("search_postings");
1316 let terms_array = sql_array_strings(&missing_terms);
1317 let query = format!(
1318 "SELECT
1319 term,
1320 doc_id AS event_uid,
1321 doc_len,
1322 tf
1323 FROM {postings_table}
1324 WHERE term IN {terms_array}
1325 FORMAT JSONEachRow",
1326 );
1327
1328 let fetched_rows: Vec<FetchedPostingRow> =
1329 self.map_backend(self.ch.query_rows(&query, None).await)?;
1330 let mut grouped = HashMap::<String, Vec<CachedPostingRow>>::new();
1331 for row in fetched_rows {
1332 grouped.entry(row.term).or_default().push(CachedPostingRow {
1333 event_uid: row.event_uid,
1334 doc_len: row.doc_len,
1335 tf: row.tf,
1336 });
1337 }
1338
1339 let mut cache = self.term_postings_cache.write().await;
1340
1341 for term in missing_terms {
1342 let rows_vec = grouped.remove(&term).unwrap_or_default();
1343 let rows: Arc<[CachedPostingRow]> = Arc::from(rows_vec.into_boxed_slice());
1344 by_term.insert(term.clone(), Arc::clone(&rows));
1345 if rows.len() <= TERM_POSTINGS_CACHE_MAX_ROWS_PER_TERM {
1346 cache.insert(
1347 term,
1348 TermPostingsCacheEntry {
1349 rows,
1350 fetched_at: now,
1351 },
1352 );
1353 }
1354 }
1355
1356 let mut cached_rows_total = cache.values().map(|entry| entry.rows.len()).sum::<usize>();
1357 while cache.len() > TERM_POSTINGS_CACHE_MAX_ENTRIES
1358 || cached_rows_total > TERM_POSTINGS_CACHE_MAX_ROWS_TOTAL
1359 {
1360 if let Some(oldest_key) = cache
1361 .iter()
1362 .min_by_key(|(_, entry)| entry.fetched_at)
1363 .map(|(k, _)| k.clone())
1364 {
1365 if let Some(evicted) = cache.remove(&oldest_key) {
1366 cached_rows_total = cached_rows_total.saturating_sub(evicted.rows.len());
1367 }
1368 } else {
1369 break;
1370 }
1371 }
1372 }
1373
1374 for term in terms {
1375 by_term
1376 .entry(term.clone())
1377 .or_insert_with(|| Arc::from(Vec::<CachedPostingRow>::new().into_boxed_slice()));
1378 }
1379 Ok(by_term)
1380 }
1381
1382 async fn load_search_doc_extras(
1383 &self,
1384 event_uids: &[String],
1385 use_document_codex_flag: bool,
1386 ) -> RepoResult<HashMap<String, SearchDocExtraCacheEntry>> {
1387 let now = Instant::now();
1388 let mut by_uid = HashMap::<String, SearchDocExtraCacheEntry>::new();
1389 let mut missing_uids = Vec::<String>::new();
1390
1391 {
1392 let cache = self.search_doc_extra_cache.read().await;
1393 for uid in event_uids {
1394 if let Some(entry) = cache.get(uid) {
1395 if now.duration_since(entry.fetched_at) <= SEARCH_DOC_EXTRA_CACHE_TTL {
1396 by_uid.insert(uid.clone(), entry.clone());
1397 continue;
1398 }
1399 }
1400 missing_uids.push(uid.clone());
1401 }
1402 }
1403
1404 if !missing_uids.is_empty() {
1405 let query =
1406 self.build_search_events_hydrate_sql(&missing_uids, use_document_codex_flag)?;
1407 let fetched_rows: Vec<SearchDocExtraRow> =
1408 self.map_backend(self.ch.query_rows(&query, None).await)?;
1409
1410 let mut cache = self.search_doc_extra_cache.write().await;
1411
1412 for row in fetched_rows {
1413 let entry = SearchDocExtraCacheEntry {
1414 session_id: row.session_id,
1415 source_name: row.source_name,
1416 provider: row.provider,
1417 event_class: row.event_class,
1418 payload_type: row.payload_type,
1419 actor_role: row.actor_role,
1420 name: row.name,
1421 phase: row.phase,
1422 source_ref: row.source_ref,
1423 doc_len: row.doc_len,
1424 text_preview: row.text_preview,
1425 has_codex_mcp: row.has_codex_mcp,
1426 fetched_at: now,
1427 };
1428 by_uid.insert(row.event_uid.clone(), entry.clone());
1429 cache.insert(row.event_uid, entry);
1430 }
1431
1432 while cache.len() > SEARCH_DOC_EXTRA_CACHE_MAX_ENTRIES {
1433 if let Some(oldest_key) = cache
1434 .iter()
1435 .min_by_key(|(_, entry)| entry.fetched_at)
1436 .map(|(k, _)| k.clone())
1437 {
1438 cache.remove(&oldest_key);
1439 } else {
1440 break;
1441 }
1442 }
1443 }
1444
1445 Ok(by_uid)
1446 }
1447
1448 async fn search_events_rows(
1449 &self,
1450 terms: &[String],
1451 docs: u64,
1452 avgdl: f64,
1453 include_tool_events: bool,
1454 event_kinds: Option<&[SearchEventKind]>,
1455 exclude_codex_mcp: bool,
1456 session_id: Option<&str>,
1457 min_should_match: u16,
1458 min_score: f64,
1459 limit: u16,
1460 ) -> RepoResult<Vec<SearchRow>> {
1461 let (rows, _candidate_count) = self
1462 .search_events_rows_fast_pass(
1463 terms,
1464 docs,
1465 avgdl,
1466 include_tool_events,
1467 event_kinds,
1468 exclude_codex_mcp,
1469 session_id,
1470 min_should_match,
1471 min_score,
1472 limit,
1473 usize::MAX,
1474 )
1475 .await?;
1476 if !rows.is_empty() {
1477 return Ok(rows);
1478 }
1479
1480 self.search_events_rows_exact_sql(
1481 terms,
1482 docs,
1483 avgdl,
1484 include_tool_events,
1485 event_kinds,
1486 exclude_codex_mcp,
1487 session_id,
1488 min_should_match,
1489 min_score,
1490 limit,
1491 )
1492 .await
1493 }
1494
1495 async fn search_events_rows_exact_sql(
1496 &self,
1497 terms: &[String],
1498 docs: u64,
1499 avgdl: f64,
1500 include_tool_events: bool,
1501 event_kinds: Option<&[SearchEventKind]>,
1502 exclude_codex_mcp: bool,
1503 session_id: Option<&str>,
1504 min_should_match: u16,
1505 min_score: f64,
1506 limit: u16,
1507 ) -> RepoResult<Vec<SearchRow>> {
1508 let df_map = self.df_map(terms).await?;
1509 let mut idf_by_term = HashMap::<String, f64>::new();
1510 for term in terms {
1511 let df = *df_map.get(term).unwrap_or(&0);
1512 idf_by_term.insert(term.clone(), Self::bm25_idf(docs, df));
1513 }
1514
1515 let use_document_codex_flag = self.search_documents_has_codex_flag().await?;
1516 let fallback_sql = self.build_search_events_sql(
1517 terms,
1518 &idf_by_term,
1519 avgdl,
1520 include_tool_events,
1521 event_kinds,
1522 exclude_codex_mcp,
1523 use_document_codex_flag,
1524 session_id,
1525 min_should_match,
1526 min_score,
1527 limit,
1528 )?;
1529
1530 let mut fallback_rows: Vec<SearchRow> =
1531 self.map_backend(self.ch.query_rows(&fallback_sql, None).await)?;
1532 fallback_rows.sort_by(|a, b| {
1533 b.score
1534 .total_cmp(&a.score)
1535 .then_with(|| a.event_uid.cmp(&b.event_uid))
1536 });
1537 Ok(fallback_rows)
1538 }
1539
1540 async fn search_events_rows_by_strategy(
1541 &self,
1542 strategy: SearchEventsStrategy,
1543 terms: &[String],
1544 docs: u64,
1545 avgdl: f64,
1546 include_tool_events: bool,
1547 event_kinds: Option<&[SearchEventKind]>,
1548 exclude_codex_mcp: bool,
1549 session_id: Option<&str>,
1550 min_should_match: u16,
1551 min_score: f64,
1552 limit: u16,
1553 ) -> RepoResult<Vec<SearchRow>> {
1554 match strategy {
1555 SearchEventsStrategy::Optimized => {
1556 self.search_events_rows(
1557 terms,
1558 docs,
1559 avgdl,
1560 include_tool_events,
1561 event_kinds,
1562 exclude_codex_mcp,
1563 session_id,
1564 min_should_match,
1565 min_score,
1566 limit,
1567 )
1568 .await
1569 }
1570 SearchEventsStrategy::OracleExact => {
1571 self.search_events_rows_exact_sql(
1572 terms,
1573 docs,
1574 avgdl,
1575 include_tool_events,
1576 event_kinds,
1577 exclude_codex_mcp,
1578 session_id,
1579 min_should_match,
1580 min_score,
1581 limit,
1582 )
1583 .await
1584 }
1585 }
1586 }
1587
1588 fn dedupe_fetch_limit(limit: u16) -> u16 {
1589 limit.saturating_mul(3).max(limit)
1590 }
1591
1592 fn is_message_search_row(row: &SearchRow) -> bool {
1593 row.event_class == "message" && row.payload_type == "message"
1594 }
1595
1596 fn is_event_msg_search_row(row: &SearchRow) -> bool {
1597 row.event_class == "event_msg"
1598 && (row.payload_type == "agent_message"
1599 || row.payload_type == "user_message"
1600 || row.payload_type == "event_msg")
1601 }
1602
1603 fn is_reasoning_search_row(row: &SearchRow) -> bool {
1604 row.event_class == "reasoning"
1605 }
1606
1607 fn is_event_msg_reasoning_search_row(row: &SearchRow) -> bool {
1608 row.event_class == "event_msg" && row.payload_type == "agent_reasoning"
1609 }
1610
1611 fn compact_preview_for_dedup(text: &str) -> String {
1612 text.split_whitespace().collect::<Vec<_>>().join(" ")
1613 }
1614
1615 fn search_rows_are_mirrors(a: &SearchRow, b: &SearchRow) -> bool {
1616 let is_message_pair = (Self::is_message_search_row(a) && Self::is_event_msg_search_row(b))
1617 || (Self::is_event_msg_search_row(a) && Self::is_message_search_row(b));
1618 let is_reasoning_pair = (Self::is_reasoning_search_row(a)
1619 && Self::is_event_msg_reasoning_search_row(b))
1620 || (Self::is_event_msg_reasoning_search_row(a) && Self::is_reasoning_search_row(b));
1621 let same_kind_pair = is_message_pair || is_reasoning_pair;
1622 if !same_kind_pair {
1623 return false;
1624 }
1625
1626 if a.session_id != b.session_id
1627 || a.actor_role != b.actor_role
1628 || a.matched_terms != b.matched_terms
1629 {
1630 return false;
1631 }
1632
1633 if (a.score - b.score).abs() > 1e-9 {
1634 return false;
1635 }
1636
1637 Self::compact_preview_for_dedup(&a.text_preview)
1638 == Self::compact_preview_for_dedup(&b.text_preview)
1639 }
1640
1641 fn search_row_kind_priority(row: &SearchRow) -> u8 {
1642 if Self::is_message_search_row(row) || Self::is_reasoning_search_row(row) {
1643 0
1644 } else if Self::is_event_msg_search_row(row) || Self::is_event_msg_reasoning_search_row(row)
1645 {
1646 1
1647 } else {
1648 2
1649 }
1650 }
1651
1652 fn should_replace_mirror(existing: &SearchRow, candidate: &SearchRow) -> bool {
1653 let existing_priority = Self::search_row_kind_priority(existing);
1654 let candidate_priority = Self::search_row_kind_priority(candidate);
1655 candidate_priority < existing_priority
1656 || (candidate_priority == existing_priority && candidate.event_uid < existing.event_uid)
1657 }
1658
1659 fn dedupe_search_rows(rows: Vec<SearchRow>, limit: u16) -> Vec<SearchRow> {
1660 let target = limit as usize;
1661 let mut deduped = Vec::<SearchRow>::with_capacity(rows.len().min(target));
1662
1663 for row in rows {
1664 if let Some(existing_idx) = deduped
1665 .iter()
1666 .position(|existing| Self::search_rows_are_mirrors(existing, &row))
1667 {
1668 if Self::should_replace_mirror(&deduped[existing_idx], &row) {
1669 deduped[existing_idx] = row;
1670 }
1671 continue;
1672 }
1673
1674 deduped.push(row);
1675 if deduped.len() >= target {
1676 break;
1677 }
1678 }
1679
1680 deduped
1681 }
1682
1683 async fn search_events_rows_fast_pass(
1684 &self,
1685 terms: &[String],
1686 docs: u64,
1687 avgdl: f64,
1688 include_tool_events: bool,
1689 event_kinds: Option<&[SearchEventKind]>,
1690 exclude_codex_mcp: bool,
1691 session_id: Option<&str>,
1692 min_should_match: u16,
1693 min_score: f64,
1694 limit: u16,
1695 candidate_limit: usize,
1696 ) -> RepoResult<(Vec<SearchRow>, usize)> {
1697 #[derive(Clone, Copy)]
1698 struct CandidateRef<'a> {
1699 row: &'a CachedPostingRow,
1700 score: f64,
1701 matched_terms: u64,
1702 }
1703
1704 let postings_by_term = self.load_term_postings_for_terms(terms).await?;
1705 let use_document_codex_flag = self.search_documents_has_codex_flag().await?;
1706 let df_map = self.df_map(terms).await?;
1707 let k1 = self.cfg.bm25_k1.max(0.01);
1708 let b = self.cfg.bm25_b.clamp(0.0, 1.0);
1709 let mut idf_by_term = HashMap::<&str, f64>::new();
1710 for term in terms {
1711 let df = *df_map.get(term).unwrap_or(&0);
1712 idf_by_term.insert(term.as_str(), Self::bm25_idf(docs, df));
1713 }
1714
1715 let mut accum_by_uid = HashMap::<&str, SearchScoreAccum<'_>>::new();
1716 for (idx, term) in terms.iter().enumerate() {
1717 if idx >= 64 {
1718 break;
1719 }
1720 let idf = *idf_by_term.get(term.as_str()).unwrap_or(&0.0);
1721 if idf <= 0.0 {
1722 continue;
1723 }
1724
1725 if let Some(rows) = postings_by_term.get(term) {
1726 for row in rows.iter() {
1727 let entry = accum_by_uid
1728 .entry(row.event_uid.as_str())
1729 .or_insert_with(|| SearchScoreAccum {
1730 row,
1731 score: 0.0,
1732 matched_mask: 0,
1733 });
1734
1735 entry.score += idf * Self::bm25_term_score(row.tf, row.doc_len, avgdl, k1, b);
1736 entry.matched_mask |= 1u64 << idx;
1737 }
1738 }
1739 }
1740
1741 let mut fast_candidates = Vec::<CandidateRef<'_>>::new();
1742 for acc in accum_by_uid.values() {
1743 let matched_terms = acc.matched_mask.count_ones() as u64;
1744 if matched_terms < min_should_match as u64 || acc.score < min_score {
1745 continue;
1746 }
1747 fast_candidates.push(CandidateRef {
1748 row: acc.row,
1749 score: acc.score,
1750 matched_terms,
1751 });
1752 }
1753
1754 if fast_candidates.is_empty() {
1755 return Ok((Vec::new(), 0));
1756 }
1757
1758 let candidate_count = fast_candidates.len();
1759 if candidate_limit < fast_candidates.len() {
1760 fast_candidates.select_nth_unstable_by(candidate_limit, |a, b| {
1761 b.score
1762 .total_cmp(&a.score)
1763 .then_with(|| a.row.event_uid.cmp(&b.row.event_uid))
1764 });
1765 fast_candidates.truncate(candidate_limit);
1766 }
1767 fast_candidates.sort_by(|a, b| {
1768 b.score
1769 .total_cmp(&a.score)
1770 .then_with(|| a.row.event_uid.cmp(&b.row.event_uid))
1771 });
1772
1773 let mut fast_rows = Vec::<SearchRow>::new();
1774 let hydrate_chunk_size = (limit as usize).saturating_mul(8).max(128);
1775 let mut offset = 0usize;
1776 while offset < fast_candidates.len() && fast_rows.len() < limit as usize {
1777 let end = (offset + hydrate_chunk_size).min(fast_candidates.len());
1778 let event_uids: Vec<String> = fast_candidates[offset..end]
1779 .iter()
1780 .map(|row| row.row.event_uid.clone())
1781 .collect();
1782 let doc_extras = self
1783 .load_search_doc_extras(&event_uids, use_document_codex_flag)
1784 .await?;
1785
1786 for row in &fast_candidates[offset..end] {
1787 let Some(extra) = doc_extras.get(row.row.event_uid.as_str()) else {
1788 continue;
1789 };
1790 if !Self::passes_search_doc_filters(
1791 extra,
1792 include_tool_events,
1793 event_kinds,
1794 exclude_codex_mcp,
1795 session_id,
1796 ) {
1797 continue;
1798 }
1799
1800 fast_rows.push(SearchRow {
1801 event_uid: row.row.event_uid.clone(),
1802 session_id: extra.session_id.clone(),
1803 source_name: extra.source_name.clone(),
1804 provider: extra.provider.clone(),
1805 event_class: extra.event_class.clone(),
1806 payload_type: extra.payload_type.clone(),
1807 actor_role: extra.actor_role.clone(),
1808 name: extra.name.clone(),
1809 phase: extra.phase.clone(),
1810 source_ref: extra.source_ref.clone(),
1811 doc_len: extra.doc_len,
1812 text_preview: extra.text_preview.clone(),
1813 score: row.score,
1814 matched_terms: row.matched_terms,
1815 });
1816
1817 if fast_rows.len() >= limit as usize {
1818 break;
1819 }
1820 }
1821 offset = end;
1822 }
1823
1824 Ok((fast_rows, candidate_count))
1825 }
1826
1827 fn conversation_candidate_limit(limit: u16) -> usize {
1828 (limit as usize)
1829 .saturating_mul(CONVERSATION_CANDIDATE_MULTIPLIER)
1830 .clamp(CONVERSATION_CANDIDATE_MIN, CONVERSATION_CANDIDATE_MAX)
1831 }
1832
1833 fn now_unix_ms() -> i64 {
1834 SystemTime::now()
1835 .duration_since(UNIX_EPOCH)
1836 .map(|d| d.as_millis() as i64)
1837 .unwrap_or_default()
1838 }
1839
1840 fn build_conversation_postings_filter_sql(
1841 &self,
1842 terms: &[String],
1843 include_tool_events: bool,
1844 exclude_codex_mcp: bool,
1845 from_unix_ms: Option<i64>,
1846 to_unix_ms: Option<i64>,
1847 recent_from_unix_ms: Option<i64>,
1848 candidate_session_ids: Option<&[String]>,
1849 ) -> (String, String, String) {
1850 let terms_array_sql = sql_array_strings(terms);
1851 let mut postings_filters = vec![format!("p.term IN {}", terms_array_sql)];
1852 let mut document_filters = Vec::new();
1853
1854 if let Some(from_unix_ms) = from_unix_ms {
1855 document_filters.push(format!(
1856 "toUnixTimestamp64Milli(d.ingested_at) >= {from_unix_ms}"
1857 ));
1858 }
1859 if let Some(to_unix_ms) = to_unix_ms {
1860 document_filters.push(format!(
1861 "toUnixTimestamp64Milli(d.ingested_at) < {to_unix_ms}"
1862 ));
1863 }
1864 if let Some(recent_from_unix_ms) = recent_from_unix_ms {
1865 document_filters.push(format!(
1866 "toUnixTimestamp64Milli(d.ingested_at) >= {recent_from_unix_ms}"
1867 ));
1868 }
1869
1870 if include_tool_events {
1871 postings_filters.push("p.payload_type != 'token_count'".to_string());
1872 } else {
1873 postings_filters
1874 .push("p.event_class IN ('message', 'reasoning', 'event_msg')".to_string());
1875 postings_filters.push(
1876 "p.payload_type NOT IN ('token_count', 'task_started', 'task_complete', 'turn_aborted', 'item_completed')"
1877 .to_string(),
1878 );
1879 }
1880
1881 if exclude_codex_mcp {
1882 postings_filters.push("p.source_name != 'codex-mcp'".to_string());
1883 postings_filters.push("lowerUTF8(p.name) NOT IN ('search', 'open')".to_string());
1884 }
1885
1886 if let Some(candidate_session_ids) = candidate_session_ids {
1887 if !candidate_session_ids.is_empty() {
1888 postings_filters.push(format!(
1889 "p.session_id IN {}",
1890 sql_array_strings(candidate_session_ids)
1891 ));
1892 }
1893 }
1894
1895 let prewhere_sql = postings_filters.join("\n AND ");
1896 let where_sql = if document_filters.is_empty() {
1897 String::new()
1898 } else {
1899 format!("WHERE {}", document_filters.join("\n AND "))
1900 };
1901 let docs_join_sql = if document_filters.is_empty() {
1902 String::new()
1903 } else {
1904 let documents_table = self.table_ref("search_documents");
1905 format!("ANY INNER JOIN {documents_table} AS d ON d.event_uid = p.doc_id")
1906 };
1907 (docs_join_sql, prewhere_sql, where_sql)
1908 }
1909
1910 fn build_search_conversation_candidates_sql(
1911 &self,
1912 terms: &[String],
1913 idf_by_term: &HashMap<String, f64>,
1914 include_tool_events: bool,
1915 exclude_codex_mcp: bool,
1916 min_should_match: u16,
1917 limit: usize,
1918 from_unix_ms: Option<i64>,
1919 to_unix_ms: Option<i64>,
1920 mode: Option<ConversationMode>,
1921 ) -> RepoResult<String> {
1922 if terms.is_empty() {
1923 return Err(RepoError::invalid_argument(
1924 "cannot build candidate query with empty terms",
1925 ));
1926 }
1927
1928 let postings_table = self.table_ref("search_postings");
1929 let conversation_terms_table = self.table_ref("search_conversation_terms");
1930 let terms_array_sql = sql_array_strings(terms);
1931 let idf_vals: Vec<f64> = terms
1932 .iter()
1933 .map(|t| *idf_by_term.get(t).unwrap_or(&0.0))
1934 .collect();
1935 let idf_array_sql = sql_array_f64(&idf_vals);
1936 let (docs_join_sql, prewhere_sql, where_sql) = self.build_conversation_postings_filter_sql(
1937 terms,
1938 include_tool_events,
1939 exclude_codex_mcp,
1940 from_unix_ms,
1941 to_unix_ms,
1942 None,
1943 None,
1944 );
1945
1946 let (mode_join_sql, mode_filter_sql) = if let Some(selected_mode) = mode {
1947 let mode_subquery = self.mode_subquery();
1948 let mode_filter_sql = Self::mode_filter_clause(Some(selected_mode))
1949 .map(|clause| format!("AND {clause}"))
1950 .unwrap_or_default();
1951 (
1952 format!("ANY LEFT JOIN ({mode_subquery}) AS m ON m.session_id = c.session_id"),
1953 mode_filter_sql,
1954 )
1955 } else {
1956 (String::new(), String::new())
1957 };
1958
1959 Ok(format!(
1960 "WITH
1961 {terms_array_sql} AS q_terms,
1962 {idf_array_sql} AS q_idf
1963 SELECT
1964 c.session_id AS session_id,
1965 c.score AS score,
1966 toUInt16(c.matched_terms) AS matched_terms
1967 FROM (
1968 SELECT
1969 ct.session_id,
1970 sum(transform(ct.term, q_terms, q_idf, 0.0) * log1p(toFloat64(ct.tf_sum))) AS score,
1971 toUInt16(countDistinct(ct.term)) AS matched_terms
1972 FROM {conversation_terms_table} AS ct
1973 ANY INNER JOIN (
1974 SELECT DISTINCT p.session_id
1975 FROM {postings_table} AS p
1976 {docs_join_sql}
1977 PREWHERE {prewhere_sql}
1978 {where_sql}
1979 ) AS eligible ON eligible.session_id = ct.session_id
1980 WHERE ct.term IN {terms_array_sql}
1981 GROUP BY ct.session_id
1982 ) AS c
1983 {mode_join_sql}
1984 WHERE c.matched_terms >= {min_should_match}
1985 {mode_filter_sql}
1986 ORDER BY c.score DESC, c.session_id ASC
1987 LIMIT {limit}
1988 FORMAT JSONEachRow",
1989 conversation_terms_table = conversation_terms_table,
1990 postings_table = postings_table,
1991 docs_join_sql = docs_join_sql,
1992 prewhere_sql = prewhere_sql,
1993 where_sql = where_sql,
1994 mode_join_sql = mode_join_sql,
1995 mode_filter_sql = mode_filter_sql,
1996 min_should_match = min_should_match,
1997 limit = limit,
1998 ))
1999 }
2000
2001 fn build_search_conversation_recent_candidates_sql(
2002 &self,
2003 terms: &[String],
2004 idf_by_term: &HashMap<String, f64>,
2005 include_tool_events: bool,
2006 exclude_codex_mcp: bool,
2007 min_should_match: u16,
2008 limit: usize,
2009 from_unix_ms: Option<i64>,
2010 to_unix_ms: Option<i64>,
2011 mode: Option<ConversationMode>,
2012 ) -> RepoResult<String> {
2013 if terms.is_empty() {
2014 return Err(RepoError::invalid_argument(
2015 "cannot build recent candidate query with empty terms",
2016 ));
2017 }
2018
2019 let postings_table = self.table_ref("search_postings");
2020 let terms_array_sql = sql_array_strings(terms);
2021 let idf_vals: Vec<f64> = terms
2022 .iter()
2023 .map(|t| *idf_by_term.get(t).unwrap_or(&0.0))
2024 .collect();
2025 let idf_array_sql = sql_array_f64(&idf_vals);
2026 let now_unix_ms = Self::now_unix_ms();
2027 let recent_floor = now_unix_ms.saturating_sub(CONVERSATION_RECENT_WINDOW_MS);
2028 let recent_from_unix_ms = match from_unix_ms {
2029 Some(from) => from.max(recent_floor),
2030 None => recent_floor,
2031 };
2032 let (docs_join_sql, prewhere_sql, where_sql) = self.build_conversation_postings_filter_sql(
2033 terms,
2034 include_tool_events,
2035 exclude_codex_mcp,
2036 from_unix_ms,
2037 to_unix_ms,
2038 Some(recent_from_unix_ms),
2039 None,
2040 );
2041
2042 let (mode_join_sql, mode_filter_sql) = if let Some(selected_mode) = mode {
2043 let mode_subquery = self.mode_subquery();
2044 let mode_filter_sql = Self::mode_filter_clause(Some(selected_mode))
2045 .map(|clause| format!("AND {clause}"))
2046 .unwrap_or_default();
2047 (
2048 format!("ANY LEFT JOIN ({mode_subquery}) AS m ON m.session_id = c.session_id"),
2049 mode_filter_sql,
2050 )
2051 } else {
2052 (String::new(), String::new())
2053 };
2054
2055 Ok(format!(
2056 "WITH
2057 {terms_array_sql} AS q_terms,
2058 {idf_array_sql} AS q_idf
2059 SELECT
2060 c.session_id AS session_id,
2061 c.score AS score,
2062 toUInt16(c.matched_terms) AS matched_terms
2063 FROM (
2064 SELECT
2065 p.session_id AS session_id,
2066 sum(transform(toString(p.term), q_terms, q_idf, 0.0) * log1p(toFloat64(p.tf))) AS score,
2067 toUInt16(countDistinct(p.term)) AS matched_terms
2068 FROM {postings_table} AS p
2069 {docs_join_sql}
2070 PREWHERE {prewhere_sql}
2071 {where_sql}
2072 GROUP BY p.session_id
2073 ) AS c
2074 {mode_join_sql}
2075 WHERE c.matched_terms >= {min_should_match}
2076 {mode_filter_sql}
2077 ORDER BY c.score DESC, c.session_id ASC
2078 LIMIT {limit}
2079 FORMAT JSONEachRow",
2080 postings_table = postings_table,
2081 docs_join_sql = docs_join_sql,
2082 prewhere_sql = prewhere_sql,
2083 where_sql = where_sql,
2084 mode_join_sql = mode_join_sql,
2085 mode_filter_sql = mode_filter_sql,
2086 min_should_match = min_should_match,
2087 limit = limit,
2088 ))
2089 }
2090
2091 async fn fetch_conversation_candidates(
2092 &self,
2093 terms: &[String],
2094 idf_by_term: &HashMap<String, f64>,
2095 include_tool_events: bool,
2096 exclude_codex_mcp: bool,
2097 min_should_match: u16,
2098 limit: u16,
2099 from_unix_ms: Option<i64>,
2100 to_unix_ms: Option<i64>,
2101 mode: Option<ConversationMode>,
2102 ) -> RepoResult<ConversationCandidateSet> {
2103 let candidate_limit = Self::conversation_candidate_limit(limit);
2104 let persistent_sql = self.build_search_conversation_candidates_sql(
2105 terms,
2106 idf_by_term,
2107 include_tool_events,
2108 exclude_codex_mcp,
2109 min_should_match,
2110 candidate_limit,
2111 from_unix_ms,
2112 to_unix_ms,
2113 mode,
2114 )?;
2115 let mut persistent_rows: Vec<ConversationCandidateRow> =
2116 self.map_backend(self.ch.query_rows(&persistent_sql, None).await)?;
2117 let truncated = persistent_rows.len() >= candidate_limit;
2118 if truncated {
2119 return Ok(ConversationCandidateSet {
2120 rows: persistent_rows,
2121 truncated: true,
2122 });
2123 }
2124
2125 let recent_sql = self.build_search_conversation_recent_candidates_sql(
2126 terms,
2127 idf_by_term,
2128 include_tool_events,
2129 exclude_codex_mcp,
2130 min_should_match,
2131 CONVERSATION_RECENT_CANDIDATE_LIMIT,
2132 from_unix_ms,
2133 to_unix_ms,
2134 mode,
2135 )?;
2136 let recent_rows: Vec<ConversationCandidateRow> =
2137 self.map_backend(self.ch.query_rows(&recent_sql, None).await)?;
2138
2139 let mut by_session = HashMap::<String, (f64, u16)>::new();
2140 for row in persistent_rows.drain(..) {
2141 by_session.insert(row.session_id, (row.score, row.matched_terms));
2142 }
2143 for row in recent_rows {
2144 let entry = by_session
2145 .entry(row.session_id)
2146 .or_insert((row.score, row.matched_terms));
2147 if row.score > entry.0 {
2148 entry.0 = row.score;
2149 }
2150 if row.matched_terms > entry.1 {
2151 entry.1 = row.matched_terms;
2152 }
2153 }
2154
2155 let mut rows = by_session
2156 .into_iter()
2157 .map(
2158 |(session_id, (score, matched_terms))| ConversationCandidateRow {
2159 session_id,
2160 score,
2161 matched_terms,
2162 },
2163 )
2164 .collect::<Vec<_>>();
2165 rows.sort_by(|a, b| {
2166 b.score
2167 .total_cmp(&a.score)
2168 .then_with(|| a.session_id.cmp(&b.session_id))
2169 });
2170 let max_rows = candidate_limit.saturating_add(CONVERSATION_RECENT_CANDIDATE_LIMIT);
2171 if rows.len() > max_rows {
2172 rows.truncate(max_rows);
2173 }
2174
2175 Ok(ConversationCandidateSet {
2176 rows,
2177 truncated: false,
2178 })
2179 }
2180
2181 fn build_search_conversations_sql(
2182 &self,
2183 terms: &[String],
2184 idf_by_term: &HashMap<String, f64>,
2185 avgdl: f64,
2186 include_tool_events: bool,
2187 exclude_codex_mcp: bool,
2188 min_should_match: u16,
2189 min_score: f64,
2190 limit: u16,
2191 from_unix_ms: Option<i64>,
2192 to_unix_ms: Option<i64>,
2193 mode: Option<ConversationMode>,
2194 candidate_session_ids: Option<&[String]>,
2195 ) -> RepoResult<String> {
2196 if terms.is_empty() {
2197 return Err(RepoError::invalid_argument(
2198 "cannot build search query with empty terms",
2199 ));
2200 }
2201
2202 let postings_table = self.table_ref("search_postings");
2203 let session_summary_table = self.table_ref("v_session_summary");
2204 let terms_array_sql = sql_array_strings(terms);
2205 let idf_vals: Vec<f64> = terms
2206 .iter()
2207 .map(|t| *idf_by_term.get(t).unwrap_or(&0.0))
2208 .collect();
2209 let idf_array_sql = sql_array_f64(&idf_vals);
2210 let (docs_join_sql, prewhere_sql, where_sql) = self.build_conversation_postings_filter_sql(
2211 terms,
2212 include_tool_events,
2213 exclude_codex_mcp,
2214 from_unix_ms,
2215 to_unix_ms,
2216 None,
2217 candidate_session_ids,
2218 );
2219 let (mode_join_sql, mode_filter_sql) = if let Some(selected_mode) = mode {
2220 let mode_subquery = self.mode_subquery();
2221 let mode_filter_sql = Self::mode_filter_clause(Some(selected_mode))
2222 .map(|clause| format!("AND {clause}"))
2223 .unwrap_or_default();
2224 (
2225 format!("ANY LEFT JOIN ({mode_subquery}) AS m ON m.session_id = c.session_id"),
2226 mode_filter_sql,
2227 )
2228 } else {
2229 (String::new(), String::new())
2230 };
2231
2232 let k1 = self.cfg.bm25_k1.max(0.01);
2233 let b = self.cfg.bm25_b.clamp(0.0, 1.0);
2234 let use_term_bitmask = terms.len() <= 63;
2235 let term_bits_with_sql = if use_term_bitmask {
2236 ",\n arrayMap(idx -> toUInt64(bitShiftLeft(toUInt64(1), idx - 1)), arrayEnumerate(q_terms)) AS q_bits"
2237 .to_string()
2238 } else {
2239 String::new()
2240 };
2241 let outer_matched_terms_sql = if use_term_bitmask {
2242 "bitCount(groupBitOr(e.term_mask))".to_string()
2243 } else {
2244 "length(arrayDistinct(arrayFlatten(groupArray(e.matched_terms_arr))))".to_string()
2245 };
2246 let inner_matched_terms_sql = if use_term_bitmask {
2247 "groupBitOr(transform(toString(p.term), q_terms, q_bits, toUInt64(0))) AS term_mask,"
2248 .to_string()
2249 } else {
2250 "groupUniqArray(toString(p.term)) AS matched_terms_arr,".to_string()
2251 };
2252
2253 Ok(format!(
2254 "WITH
2255 {k1:.6} AS k1,
2256 {b:.6} AS b,
2257 greatest({avgdl:.6}, 1.0) AS avgdl,
2258 {terms_array_sql} AS q_terms,
2259 {idf_array_sql} AS q_idf{term_bits_with_sql}
2260 SELECT
2261 c.session_id AS session_id,
2262 if(s.session_id = '', '', toString(s.first_event_time)) AS first_event_time,
2263 if(
2264 s.session_id = '',
2265 toInt64(0),
2266 toInt64(toUnixTimestamp64Milli(s.first_event_time))
2267 ) AS first_event_unix_ms,
2268 if(s.session_id = '', '', toString(s.last_event_time)) AS last_event_time,
2269 if(
2270 s.session_id = '',
2271 toInt64(0),
2272 toInt64(toUnixTimestamp64Milli(s.last_event_time))
2273 ) AS last_event_unix_ms,
2274 c.provider AS provider,
2275 c.score AS score,
2276 toUInt16(c.matched_terms) AS matched_terms,
2277 toUInt32(c.event_count_considered) AS event_count_considered,
2278 c.best_event_uid AS best_event_uid
2279 FROM (
2280 SELECT
2281 e.session_id AS session_id,
2282 sum(e.event_score) AS score,
2283 {outer_matched_terms_sql} AS matched_terms,
2284 count() AS event_count_considered,
2285 argMax(e.provider, e.event_score) AS provider,
2286 argMax(e.event_uid, e.event_score) AS best_event_uid
2287 FROM (
2288 SELECT
2289 p.doc_id AS event_uid,
2290 any(p.session_id) AS session_id,
2291 any(p.provider) AS provider,
2292 {inner_matched_terms_sql}
2293 sum(
2294 transform(toString(p.term), q_terms, q_idf, 0.0)
2295 *
2296 (
2297 (toFloat64(p.tf) * (k1 + 1.0))
2298 /
2299 (toFloat64(p.tf) + k1 * (1.0 - b + b * (toFloat64(p.doc_len) / avgdl)))
2300 )
2301 ) AS event_score
2302 FROM {postings_table} AS p
2303 {docs_join_sql}
2304 PREWHERE {prewhere_sql}
2305 {where_sql}
2306 GROUP BY p.doc_id
2307 ) AS e
2308 GROUP BY e.session_id
2309 ) AS c
2310 ANY LEFT JOIN {session_summary_table} AS s ON s.session_id = c.session_id
2311 {mode_join_sql}
2312 WHERE c.matched_terms >= {min_should_match}
2313 AND c.score >= {min_score:.6}
2314 {mode_filter_sql}
2315 ORDER BY c.score DESC, c.session_id ASC
2316 LIMIT {limit}
2317 FORMAT JSONEachRow",
2318 postings_table = postings_table,
2319 docs_join_sql = docs_join_sql,
2320 prewhere_sql = prewhere_sql,
2321 where_sql = where_sql,
2322 outer_matched_terms_sql = outer_matched_terms_sql,
2323 inner_matched_terms_sql = inner_matched_terms_sql,
2324 term_bits_with_sql = term_bits_with_sql,
2325 session_summary_table = session_summary_table,
2326 mode_join_sql = mode_join_sql,
2327 mode_filter_sql = mode_filter_sql,
2328 min_should_match = min_should_match,
2329 min_score = min_score,
2330 limit = limit,
2331 ))
2332 }
2333
2334 async fn fetch_conversation_snippets(
2335 &self,
2336 event_uids: &[String],
2337 ) -> RepoResult<HashMap<String, String>> {
2338 if event_uids.is_empty() {
2339 return Ok(HashMap::new());
2340 }
2341
2342 let documents_table = self.table_ref("search_documents");
2343 let event_uids_sql = sql_array_strings(event_uids);
2344 let sql = format!(
2345 "SELECT
2346 event_uid,
2347 leftUTF8(any(text_content), {preview}) AS snippet
2348 FROM {documents_table}
2349 WHERE event_uid IN {event_uids_sql}
2350 GROUP BY event_uid
2351 FORMAT JSONEachRow",
2352 preview = self.cfg.preview_chars,
2353 documents_table = documents_table,
2354 event_uids_sql = event_uids_sql,
2355 );
2356 let rows: Vec<ConversationSnippetRow> =
2357 self.map_backend(self.ch.query_rows(&sql, None).await)?;
2358 let mut by_uid = HashMap::new();
2359 for row in rows {
2360 by_uid.insert(row.event_uid, row.snippet);
2361 }
2362 Ok(by_uid)
2363 }
2364
2365 async fn load_session_time_bounds(
2366 &self,
2367 session_ids: &[String],
2368 ) -> RepoResult<HashMap<String, SessionTimeBounds>> {
2369 let mut unique_session_ids = session_ids.to_vec();
2370 unique_session_ids.sort_unstable();
2371 unique_session_ids.dedup();
2372 if unique_session_ids.is_empty() {
2373 return Ok(HashMap::new());
2374 }
2375
2376 let session_summary_table = self.table_ref("v_session_summary");
2377 let session_ids_sql = sql_array_strings(&unique_session_ids);
2378 let sql = format!(
2379 "SELECT
2380 s.session_id AS session_id,
2381 toString(s.first_event_time) AS first_event_time,
2382 toString(s.last_event_time) AS last_event_time
2383 FROM {session_summary_table} AS s
2384 WHERE s.session_id IN {session_ids_sql}
2385 FORMAT JSONEachRow",
2386 session_summary_table = session_summary_table,
2387 session_ids_sql = session_ids_sql,
2388 );
2389
2390 let rows: Vec<SessionTimeBoundsRow> =
2391 match self.map_backend(self.ch.query_rows(&sql, None).await) {
2392 Ok(rows) => rows,
2393 Err(err) => {
2394 warn!("failed to load session time bounds: {}", err);
2395 return Ok(HashMap::new());
2396 }
2397 };
2398 let mut bounds_by_session = HashMap::new();
2399 for row in rows {
2400 bounds_by_session.insert(
2401 row.session_id,
2402 SessionTimeBounds {
2403 first_event_time: row.first_event_time,
2404 last_event_time: row.last_event_time,
2405 },
2406 );
2407 }
2408 Ok(bounds_by_session)
2409 }
2410
2411 async fn map_search_rows_to_hits(
2412 &self,
2413 rows: Vec<SearchRow>,
2414 ) -> RepoResult<Vec<SearchEventHit>> {
2415 let session_ids = rows
2416 .iter()
2417 .map(|row| row.session_id.clone())
2418 .collect::<Vec<_>>();
2419 let session_time_bounds = self.load_session_time_bounds(&session_ids).await?;
2420
2421 Ok(rows
2422 .into_iter()
2423 .enumerate()
2424 .map(|(idx, row)| {
2425 let session_id = row.session_id;
2426 let (first_event_time, last_event_time) = session_time_bounds
2427 .get(session_id.as_str())
2428 .map(|bounds| {
2429 (
2430 bounds.first_event_time.clone(),
2431 bounds.last_event_time.clone(),
2432 )
2433 })
2434 .unwrap_or_default();
2435
2436 SearchEventHit {
2437 rank: idx + 1,
2438 event_uid: row.event_uid,
2439 session_id,
2440 first_event_time,
2441 last_event_time,
2442 source_name: row.source_name,
2443 provider: row.provider,
2444 score: row.score,
2445 matched_terms: row.matched_terms,
2446 doc_len: row.doc_len,
2447 event_class: row.event_class,
2448 payload_type: row.payload_type,
2449 actor_role: row.actor_role,
2450 name: row.name,
2451 phase: row.phase,
2452 source_ref: row.source_ref,
2453 text_preview: row.text_preview,
2454 }
2455 })
2456 .collect())
2457 }
2458
2459 async fn fetch_conversation_session_metadata(
2460 &self,
2461 session_ids: &[String],
2462 ) -> RepoResult<HashMap<String, ConversationSessionMetadataRow>> {
2463 if session_ids.is_empty() {
2464 return Ok(HashMap::new());
2465 }
2466
2467 let events_table = self.table_ref("events");
2468 let session_ids_sql = sql_array_strings(session_ids);
2469 let sql = format!(
2470 "SELECT
2471 session_id,
2472 argMax(provider, event_ts) AS provider,
2473 ifNull(argMax(nullIf(JSONExtractString(payload_json, 'slug'), ''), event_ts), '') AS session_slug,
2474 ifNull(
2475 argMax(
2476 coalesce(
2477 nullIf(JSONExtractString(payload_json, 'summary'), ''),
2478 nullIf(JSONExtractString(payload_json, 'title'), ''),
2479 nullIf(JSONExtractString(payload_json, 'name'), '')
2480 ),
2481 event_ts
2482 ),
2483 ''
2484 ) AS session_summary
2485 FROM {events_table}
2486 WHERE event_kind = 'session_meta'
2487 AND session_id IN {session_ids_sql}
2488 GROUP BY session_id
2489 FORMAT JSONEachRow",
2490 events_table = events_table,
2491 session_ids_sql = session_ids_sql,
2492 );
2493
2494 let rows: Vec<ConversationSessionMetadataRow> =
2495 self.map_backend(self.ch.query_rows(&sql, None).await)?;
2496 let mut by_session = HashMap::new();
2497 for row in rows {
2498 by_session.insert(row.session_id.clone(), row);
2499 }
2500 Ok(by_session)
2501 }
2502
2503 async fn log_search_events(
2504 &self,
2505 query_id: &str,
2506 source: &str,
2507 raw_query: &str,
2508 session_hint: &str,
2509 terms: &[String],
2510 limit: u16,
2511 min_should_match: u16,
2512 min_score: f64,
2513 include_tool_events: bool,
2514 event_kinds: Option<&[SearchEventKind]>,
2515 exclude_codex_mcp: bool,
2516 took_ms: u32,
2517 hits: &[SearchEventHit],
2518 docs: u64,
2519 avgdl: f64,
2520 ) {
2521 let event_kinds = event_kinds
2522 .map(|kinds| kinds.iter().map(|kind| kind.as_str()).collect::<Vec<_>>())
2523 .unwrap_or_default();
2524 let metadata_json = match serde_json::to_string(&json!({
2525 "docs": docs,
2526 "avgdl": avgdl,
2527 "k1": self.cfg.bm25_k1,
2528 "b": self.cfg.bm25_b,
2529 "event_kinds": event_kinds
2530 })) {
2531 Ok(value) => value,
2532 Err(err) => {
2533 warn!("failed to encode search metadata: {}", err);
2534 "{}".to_string()
2535 }
2536 };
2537
2538 let query_row = json!({
2539 "query_id": query_id,
2540 "source": source,
2541 "session_hint": session_hint,
2542 "raw_query": raw_query,
2543 "normalized_terms": terms,
2544 "term_count": terms.len() as u16,
2545 "result_limit": limit,
2546 "min_should_match": min_should_match,
2547 "min_score": min_score,
2548 "include_tool_events": if include_tool_events { 1 } else { 0 },
2549 "exclude_codex_mcp": if exclude_codex_mcp { 1 } else { 0 },
2550 "response_ms": took_ms,
2551 "result_count": hits.len() as u16,
2552 "metadata_json": metadata_json,
2553 });
2554
2555 let hit_rows: Vec<Value> = hits
2556 .iter()
2557 .map(|hit| {
2558 json!({
2559 "query_id": query_id,
2560 "rank": hit.rank as u16,
2561 "event_uid": hit.event_uid,
2562 "session_id": hit.session_id,
2563 "source_name": hit.source_name,
2564 "provider": hit.provider,
2565 "score": hit.score,
2566 "matched_terms": hit.matched_terms as u16,
2567 "doc_len": hit.doc_len,
2568 "event_class": hit.event_class,
2569 "payload_type": hit.payload_type,
2570 "actor_role": hit.actor_role,
2571 "name": hit.name,
2572 "source_ref": hit.source_ref,
2573 })
2574 })
2575 .collect();
2576
2577 let ch = self.ch.clone();
2578 if self.cfg.async_log_writes {
2579 tokio::spawn(async move {
2580 if let Err(err) = ch.insert_json_rows("search_query_log", &[query_row]).await {
2581 warn!("failed to write search_query_log: {}", err);
2582 }
2583 if !hit_rows.is_empty() {
2584 if let Err(err) = ch.insert_json_rows("search_hit_log", &hit_rows).await {
2585 warn!("failed to write search_hit_log: {}", err);
2586 }
2587 }
2588 });
2589 } else {
2590 if let Err(err) = self
2591 .ch
2592 .insert_json_rows("search_query_log", &[query_row])
2593 .await
2594 {
2595 warn!("failed to write search_query_log: {}", err);
2596 }
2597 if !hit_rows.is_empty() {
2598 if let Err(err) = self.ch.insert_json_rows("search_hit_log", &hit_rows).await {
2599 warn!("failed to write search_hit_log: {}", err);
2600 }
2601 }
2602 }
2603 }
2604 }
2605
2606 #[async_trait]
2607 impl ConversationRepository for ClickHouseConversationRepository {
2608 async fn list_conversations(
2609 &self,
2610 filter: ConversationListFilter,
2611 page: PageRequest,
2612 ) -> RepoResult<Page<ConversationSummary>> {
2613 Self::validate_time_bounds(filter.from_unix_ms, filter.to_unix_ms)?;
2614
2615 let limit = page.normalized_limit(self.cfg.max_results);
2616 let filter_sig = Self::conversation_filter_sig(&filter);
2617
2618 let cursor = if let Some(token) = page.cursor.as_deref() {
2619 let cursor: ConversationCursor = decode_cursor(token)?;
2620 if cursor.filter_sig != filter_sig {
2621 return Err(RepoError::invalid_cursor(
2622 "cursor does not match current conversation filter",
2623 ));
2624 }
2625 Some(cursor)
2626 } else {
2627 None
2628 };
2629
2630 let session_summary = self.table_ref("v_session_summary");
2631 let mode_subquery = self.mode_subquery();
2632
2633 let mut where_clauses = vec!["1 = 1".to_string()];
2634
2635 if let Some(from_unix_ms) = filter.from_unix_ms {
2636 where_clauses.push(format!(
2637 "toUnixTimestamp64Milli(s.last_event_time) >= {from_unix_ms}"
2638 ));
2639 }
2640 if let Some(to_unix_ms) = filter.to_unix_ms {
2641 where_clauses.push(format!(
2642 "toUnixTimestamp64Milli(s.last_event_time) < {to_unix_ms}"
2643 ));
2644 }
2645 if let Some(mode_clause) = Self::mode_filter_clause(filter.mode) {
2646 where_clauses.push(mode_clause);
2647 }
2648
2649 if let Some(cursor) = &cursor {
2650 where_clauses.push(format!(
2651 "(toUnixTimestamp64Milli(s.last_event_time) < {} OR (toUnixTimestamp64Milli(s.last_event_time) = {} AND s.session_id < {}))",
2652 cursor.last_event_unix_ms,
2653 cursor.last_event_unix_ms,
2654 sql_quote(&cursor.session_id)
2655 ));
2656 }
2657
2658 let where_sql = where_clauses.join("\n AND ");
2659
2660 let query = format!(
2661 "SELECT
2662 s.session_id,
2663 toString(s.first_event_time) AS first_event_time,
2664 toInt64(toUnixTimestamp64Milli(s.first_event_time)) AS first_event_unix_ms,
2665 toString(s.last_event_time) AS last_event_time,
2666 toInt64(toUnixTimestamp64Milli(s.last_event_time)) AS last_event_unix_ms,
2667 toUInt32(s.total_turns) AS total_turns,
2668 toUInt64(s.total_events) AS total_events,
2669 toUInt64(s.user_messages) AS user_messages,
2670 toUInt64(s.assistant_messages) AS assistant_messages,
2671 toUInt64(s.tool_calls) AS tool_calls,
2672 toUInt64(s.tool_results) AS tool_results,
2673 ifNull(m.mode, 'chat') AS mode
2674 FROM {session_summary} AS s
2675 LEFT JOIN ({mode_subquery}) AS m ON m.session_id = s.session_id
2676 WHERE {where_sql}
2677 ORDER BY s.last_event_time DESC, s.session_id DESC
2678 LIMIT {limit_plus}
2679 FORMAT JSONEachRow",
2680 session_summary = session_summary,
2681 mode_subquery = mode_subquery,
2682 where_sql = where_sql,
2683 limit_plus = (limit as usize) + 1,
2684 );
2685
2686 let rows: Vec<ConversationSummaryRow> =
2687 self.map_backend(self.ch.query_rows(&query, None).await)?;
2688
2689 let mut items: Vec<ConversationSummary> = rows
2690 .iter()
2691 .take(limit as usize)
2692 .cloned()
2693 .map(Self::map_conversation_row)
2694 .collect();
2695
2696 let next_cursor = if rows.len() > limit as usize {
2697 if let Some(last) = items.last() {
2698 Some(encode_cursor(&ConversationCursor {
2699 last_event_unix_ms: last.last_event_unix_ms,
2700 session_id: last.session_id.clone(),
2701 filter_sig,
2702 })?)
2703 } else {
2704 None
2705 }
2706 } else {
2707 None
2708 };
2709
2710 Ok(Page {
2711 items: std::mem::take(&mut items),
2712 next_cursor,
2713 })
2714 }
2715
2716 async fn get_conversation(
2717 &self,
2718 session_id: &str,
2719 opts: ConversationDetailOptions,
2720 ) -> RepoResult<Option<Conversation>> {
2721 Self::validate_session_id(session_id)?;
2722
2723 let Some(summary) = self.load_conversation_summary(session_id).await? else {
2724 return Ok(None);
2725 };
2726
2727 let turns = if opts.include_turns {
2728 self.load_turns_for_session(session_id).await?
2729 } else {
2730 Vec::new()
2731 };
2732
2733 Ok(Some(Conversation { summary, turns }))
2734 }
2735
2736 async fn list_turns(
2737 &self,
2738 session_id: &str,
2739 filter: TurnListFilter,
2740 page: PageRequest,
2741 ) -> RepoResult<Page<TurnSummary>> {
2742 Self::validate_session_id(session_id)?;
2743
2744 let limit = page.normalized_limit(self.cfg.max_results);
2745 let filter_sig = Self::turn_filter_sig(session_id, &filter);
2746
2747 let cursor = if let Some(token) = page.cursor.as_deref() {
2748 let cursor: TurnCursor = decode_cursor(token)?;
2749 if cursor.filter_sig != filter_sig {
2750 return Err(RepoError::invalid_cursor(
2751 "cursor does not match current turn filter",
2752 ));
2753 }
2754 Some(cursor)
2755 } else {
2756 None
2757 };
2758
2759 let turn_summary = self.table_ref("v_turn_summary");
2760 let mut where_clauses = vec![format!("session_id = {}", sql_quote(session_id))];
2761
2762 if let Some(from_turn_seq) = filter.from_turn_seq {
2763 where_clauses.push(format!("turn_seq >= {from_turn_seq}"));
2764 }
2765 if let Some(to_turn_seq) = filter.to_turn_seq {
2766 where_clauses.push(format!("turn_seq <= {to_turn_seq}"));
2767 }
2768
2769 if let Some(cursor) = &cursor {
2770 if cursor.session_id != session_id {
2771 return Err(RepoError::invalid_cursor(
2772 "cursor session_id does not match requested session_id",
2773 ));
2774 }
2775 where_clauses.push(format!("turn_seq > {}", cursor.last_turn_seq));
2776 }
2777
2778 let where_sql = where_clauses.join("\n AND ");
2779 let query = format!(
2780 "SELECT
2781 session_id,
2782 toUInt32(turn_seq) AS turn_seq,
2783 ifNull(turn_id, '') AS turn_id,
2784 toString(started_at) AS started_at,
2785 toInt64(toUnixTimestamp64Milli(started_at)) AS started_at_unix_ms,
2786 toString(ended_at) AS ended_at,
2787 toInt64(toUnixTimestamp64Milli(ended_at)) AS ended_at_unix_ms,
2788 toUInt64(total_events) AS total_events,
2789 toUInt64(user_messages) AS user_messages,
2790 toUInt64(assistant_messages) AS assistant_messages,
2791 toUInt64(tool_calls) AS tool_calls,
2792 toUInt64(tool_results) AS tool_results,
2793 toUInt64(reasoning_items) AS reasoning_items
2794 FROM {turn_summary}
2795 WHERE {where_sql}
2796 ORDER BY turn_seq ASC
2797 LIMIT {limit_plus}
2798 FORMAT JSONEachRow",
2799 turn_summary = turn_summary,
2800 where_sql = where_sql,
2801 limit_plus = (limit as usize) + 1,
2802 );
2803
2804 let rows: Vec<TurnSummaryRow> = self.map_backend(self.ch.query_rows(&query, None).await)?;
2805 let items: Vec<TurnSummary> = rows
2806 .iter()
2807 .take(limit as usize)
2808 .cloned()
2809 .map(Self::map_turn_row)
2810 .collect();
2811
2812 let next_cursor = if rows.len() > limit as usize {
2813 if let Some(last) = items.last() {
2814 Some(encode_cursor(&TurnCursor {
2815 last_turn_seq: last.turn_seq,
2816 session_id: session_id.to_string(),
2817 filter_sig,
2818 })?)
2819 } else {
2820 None
2821 }
2822 } else {
2823 None
2824 };
2825
2826 Ok(Page { items, next_cursor })
2827 }
2828
2829 async fn get_turn(&self, session_id: &str, turn_seq: u32) -> RepoResult<Option<Turn>> {
2830 Self::validate_session_id(session_id)?;
2831
2832 let turn_summary = self.table_ref("v_turn_summary");
2833 let summary_query = format!(
2834 "SELECT
2835 session_id,
2836 toUInt32(turn_seq) AS turn_seq,
2837 ifNull(turn_id, '') AS turn_id,
2838 toString(started_at) AS started_at,
2839 toInt64(toUnixTimestamp64Milli(started_at)) AS started_at_unix_ms,
2840 toString(ended_at) AS ended_at,
2841 toInt64(toUnixTimestamp64Milli(ended_at)) AS ended_at_unix_ms,
2842 toUInt64(total_events) AS total_events,
2843 toUInt64(user_messages) AS user_messages,
2844 toUInt64(assistant_messages) AS assistant_messages,
2845 toUInt64(tool_calls) AS tool_calls,
2846 toUInt64(tool_results) AS tool_results,
2847 toUInt64(reasoning_items) AS reasoning_items
2848 FROM {turn_summary}
2849 WHERE session_id = {} AND turn_seq = {}
2850 LIMIT 1
2851 FORMAT JSONEachRow",
2852 sql_quote(session_id),
2853 turn_seq,
2854 );
2855
2856 let rows: Vec<TurnSummaryRow> =
2857 self.map_backend(self.ch.query_rows(&summary_query, None).await)?;
2858 let Some(summary_row) = rows.into_iter().next() else {
2859 return Ok(None);
2860 };
2861
2862 let trace_table = self.table_ref("v_conversation_trace");
2863 let events_query = format!(
2864 "SELECT
2865 session_id,
2866 event_uid,
2867 toUInt64(event_order) AS event_order,
2868 toUInt32(turn_seq) AS turn_seq,
2869 toString(event_time) AS event_time,
2870 actor_role,
2871 event_class,
2872 payload_type,
2873 call_id,
2874 name,
2875 phase,
2876 item_id,
2877 source_ref,
2878 text_content,
2879 payload_json,
2880 token_usage_json
2881 FROM {trace_table}
2882 WHERE session_id = {} AND turn_seq = {}
2883 ORDER BY event_order ASC
2884 FORMAT JSONEachRow",
2885 sql_quote(session_id),
2886 turn_seq,
2887 );
2888
2889 let event_rows: Vec<TraceEventRow> =
2890 self.map_backend(self.ch.query_rows(&events_query, None).await)?;
2891 let events = event_rows.into_iter().map(Self::map_trace_event).collect();
2892
2893 Ok(Some(Turn {
2894 summary: Self::map_turn_row(summary_row),
2895 events,
2896 }))
2897 }
2898
2899 async fn open_event(&self, req: OpenEventRequest) -> RepoResult<OpenContext> {
2900 let event_uid = req.event_uid.trim();
2901 if event_uid.is_empty() {
2902 return Err(RepoError::invalid_argument("event_uid cannot be empty"));
2903 }
2904 Self::validate_event_uid(event_uid)?;
2905
2906 let before = req.before.unwrap_or(self.cfg.default_context_before);
2907 let after = req.after.unwrap_or(self.cfg.default_context_after);
2908 let include_system_events = req.include_system_events.unwrap_or(false);
2909 let context_filter = Self::open_context_filter_clause(include_system_events);
2910 let trace_table = self.table_ref("v_conversation_trace");
2911
2912 let target_query = format!(
2913 "SELECT session_id, event_order, turn_seq FROM {trace_table} WHERE event_uid = {} ORDER BY event_order DESC LIMIT 1 FORMAT JSONEachRow",
2914 sql_quote(event_uid)
2915 );
2916
2917 let targets: Vec<OpenTargetRow> =
2918 self.map_backend(self.ch.query_rows(&target_query, None).await)?;
2919 let Some(target) = targets.first() else {
2920 return Ok(OpenContext {
2921 found: false,
2922 event_uid: event_uid.to_string(),
2923 session_id: String::new(),
2924 target_event_order: 0,
2925 turn_seq: 0,
2926 before,
2927 after,
2928 events: Vec::new(),
2929 });
2930 };
2931
2932 let target_row_query = format!(
2933 "SELECT
2934 session_id,
2935 event_uid,
2936 toUInt64(event_order) AS event_order,
2937 toUInt32(turn_seq) AS turn_seq,
2938 toString(event_time) AS event_time,
2939 actor_role,
2940 event_class,
2941 payload_type,
2942 call_id,
2943 name,
2944 phase,
2945 item_id,
2946 source_ref,
2947 text_content,
2948 payload_json,
2949 token_usage_json
2950 FROM {trace_table}
2951 WHERE session_id = {} AND event_order = {} AND event_uid = {}
2952 ORDER BY event_order ASC
2953 FORMAT JSONEachRow",
2954 sql_quote(&target.session_id),
2955 target.event_order,
2956 sql_quote(event_uid),
2957 );
2958
2959 let mut before_rows: Vec<TraceEventRow> = if before == 0 {
2960 Vec::new()
2961 } else {
2962 let before_query = format!(
2963 "SELECT
2964 session_id,
2965 event_uid,
2966 toUInt64(event_order) AS event_order,
2967 toUInt32(turn_seq) AS turn_seq,
2968 toString(event_time) AS event_time,
2969 actor_role,
2970 event_class,
2971 payload_type,
2972 call_id,
2973 name,
2974 phase,
2975 item_id,
2976 source_ref,
2977 text_content,
2978 payload_json,
2979 token_usage_json
2980 FROM {trace_table}
2981 WHERE session_id = {} AND event_order < {}{}
2982 ORDER BY event_order DESC
2983 LIMIT {}
2984 FORMAT JSONEachRow",
2985 sql_quote(&target.session_id),
2986 target.event_order,
2987 context_filter,
2988 before,
2989 );
2990
2991 self.map_backend(self.ch.query_rows(&before_query, None).await)?
2992 };
2993 if !include_system_events {
2994 before_rows.retain(|row| {
2995 !Self::is_low_information_system_event(&row.actor_role, &row.payload_type)
2996 });
2997 }
2998 before_rows.reverse();
2999
3000 let target_rows: Vec<TraceEventRow> =
3001 self.map_backend(self.ch.query_rows(&target_row_query, None).await)?;
3002
3003 let mut after_rows: Vec<TraceEventRow> = if after == 0 {
3004 Vec::new()
3005 } else {
3006 let after_query = format!(
3007 "SELECT
3008 session_id,
3009 event_uid,
3010 toUInt64(event_order) AS event_order,
3011 toUInt32(turn_seq) AS turn_seq,
3012 toString(event_time) AS event_time,
3013 actor_role,
3014 event_class,
3015 payload_type,
3016 call_id,
3017 name,
3018 phase,
3019 item_id,
3020 source_ref,
3021 text_content,
3022 payload_json,
3023 token_usage_json
3024 FROM {trace_table}
3025 WHERE session_id = {} AND event_order > {}{}
3026 ORDER BY event_order ASC
3027 LIMIT {}
3028 FORMAT JSONEachRow",
3029 sql_quote(&target.session_id),
3030 target.event_order,
3031 context_filter,
3032 after,
3033 );
3034
3035 self.map_backend(self.ch.query_rows(&after_query, None).await)?
3036 };
3037 if !include_system_events {
3038 after_rows.retain(|row| {
3039 !Self::is_low_information_system_event(&row.actor_role, &row.payload_type)
3040 });
3041 }
3042
3043 let mut rows = Vec::with_capacity(before_rows.len() + target_rows.len() + after_rows.len());
3044 rows.extend(before_rows);
3045 rows.extend(target_rows);
3046 rows.extend(after_rows);
3047
3048 let events: Vec<OpenEvent> = rows
3049 .into_iter()
3050 .map(|row| OpenEvent {
3051 is_target: row.event_uid == event_uid,
3052 session_id: row.session_id,
3053 event_uid: row.event_uid,
3054 event_order: row.event_order,
3055 turn_seq: row.turn_seq,
3056 event_time: row.event_time,
3057 actor_role: row.actor_role,
3058 event_class: row.event_class,
3059 payload_type: row.payload_type,
3060 call_id: row.call_id,
3061 name: row.name,
3062 phase: row.phase,
3063 item_id: row.item_id,
3064 source_ref: row.source_ref,
3065 text_content: row.text_content,
3066 payload_json: row.payload_json,
3067 token_usage_json: row.token_usage_json,
3068 })
3069 .collect();
3070
3071 Ok(OpenContext {
3072 found: true,
3073 event_uid: event_uid.to_string(),
3074 session_id: target.session_id.clone(),
3075 target_event_order: target.event_order,
3076 turn_seq: target.turn_seq,
3077 before,
3078 after,
3079 events,
3080 })
3081 }
3082
3083 async fn search_events(&self, query: SearchEventsQuery) -> RepoResult<SearchEventsResult> {
3084 let query_text = query.query.trim();
3085 if query_text.is_empty() {
3086 return Err(RepoError::invalid_argument("query cannot be empty"));
3087 }
3088 let source = query
3089 .source
3090 .as_deref()
3091 .map(str::trim)
3092 .filter(|raw| !raw.is_empty())
3093 .unwrap_or("moraine-conversations");
3094
3095 let query_id = if source == BENCHMARK_REPLAY_SOURCE {
3096 "benchmark-replay".to_string()
3097 } else {
3098 Uuid::new_v4().to_string()
3099 };
3100 let started = Instant::now();
3101
3102 let terms_with_qf = tokenize_query(query_text, self.cfg.bm25_max_query_terms);
3103 if terms_with_qf.is_empty() {
3104 return Err(RepoError::invalid_argument(
3105 "query has no searchable terms (tokens shorter than 2 characters are excluded)",
3106 ));
3107 }
3108 let terms: Vec<String> = terms_with_qf.iter().map(|(term, _)| term.clone()).collect();
3109
3110 let requested_limit = query.limit.unwrap_or(self.cfg.max_results).max(1);
3111 let limit = requested_limit.min(self.cfg.max_results);
3112 let limit_capped = requested_limit > limit;
3113
3114 let min_should_match = query
3115 .min_should_match
3116 .unwrap_or(self.cfg.bm25_default_min_should_match)
3117 .max(1)
3118 .min(terms.len() as u16);
3119
3120 let min_score = query.min_score.unwrap_or(self.cfg.bm25_default_min_score);
3121 let include_tool_events = query
3122 .include_tool_events
3123 .unwrap_or(self.cfg.default_include_tool_events);
3124 let event_kinds = Self::normalize_event_kinds(query.event_kinds)?;
3125 let exclude_codex_mcp = query
3126 .exclude_codex_mcp
3127 .unwrap_or(self.cfg.default_exclude_codex_mcp);
3128 let disable_cache = query.disable_cache.unwrap_or(false);
3129 let effective_strategy = query.search_strategy.unwrap_or_default();
3130
3131 if let Some(session_id) = query.session_id.as_deref() {
3132 Self::validate_session_id(session_id)?;
3133 }
3134 let session_id = query.session_id.as_deref();
3135
3136 let (docs, total_doc_len) = self.corpus_stats().await?;
3137 if docs == 0 {
3138 return Ok(SearchEventsResult {
3139 query_id,
3140 query: query_text.to_string(),
3141 terms,
3142 stats: SearchEventsStats {
3143 docs: 0,
3144 avgdl: 0.0,
3145 took_ms: started.elapsed().as_millis() as u32,
3146 result_count: 0,
3147 requested_limit,
3148 effective_limit: limit,
3149 limit_capped,
3150 },
3151 hits: Vec::new(),
3152 });
3153 }
3154
3155 let avgdl = (total_doc_len as f64 / docs as f64).max(1.0);
3156 let fetch_limit = Self::dedupe_fetch_limit(limit);
3157
3158 let hits = if disable_cache {
3159 let rows = self
3160 .search_events_rows_by_strategy(
3161 effective_strategy,
3162 &terms,
3163 docs,
3164 avgdl,
3165 include_tool_events,
3166 event_kinds.as_deref(),
3167 exclude_codex_mcp,
3168 session_id,
3169 min_should_match,
3170 min_score,
3171 fetch_limit,
3172 )
3173 .await?;
3174 let rows = Self::dedupe_search_rows(rows, limit);
3175 self.map_search_rows_to_hits(rows).await?
3176 } else {
3177 let cache_key = Self::search_events_cache_key(
3178 &terms,
3179 effective_strategy,
3180 include_tool_events,
3181 event_kinds.as_deref(),
3182 exclude_codex_mcp,
3183 session_id,
3184 min_should_match,
3185 min_score,
3186 limit,
3187 );
3188
3189 if let Some(cached_hits) = self.search_events_cache_get(&cache_key).await {
3190 cached_hits
3191 } else {
3192 let fresh_rows = self
3193 .search_events_rows_by_strategy(
3194 effective_strategy,
3195 &terms,
3196 docs,
3197 avgdl,
3198 include_tool_events,
3199 event_kinds.as_deref(),
3200 exclude_codex_mcp,
3201 session_id,
3202 min_should_match,
3203 min_score,
3204 fetch_limit,
3205 )
3206 .await?;
3207 let fresh_rows = Self::dedupe_search_rows(fresh_rows, limit);
3208 let fresh_hits = self.map_search_rows_to_hits(fresh_rows).await?;
3209 self.search_events_cache_put(cache_key, &fresh_hits).await;
3210 fresh_hits
3211 }
3212 };
3213
3214 let took_ms = started.elapsed().as_millis() as u32;
3215
3216 if source != BENCHMARK_REPLAY_SOURCE {
3217 self.log_search_events(
3218 &query_id,
3219 source,
3220 query_text,
3221 session_id.unwrap_or(""),
3222 &terms,
3223 limit,
3224 min_should_match,
3225 min_score,
3226 include_tool_events,
3227 event_kinds.as_deref(),
3228 exclude_codex_mcp,
3229 took_ms,
3230 &hits,
3231 docs,
3232 avgdl,
3233 )
3234 .await;
3235 }
3236
3237 Ok(SearchEventsResult {
3238 query_id,
3239 query: query_text.to_string(),
3240 terms,
3241 stats: SearchEventsStats {
3242 docs,
3243 avgdl,
3244 took_ms,
3245 result_count: hits.len(),
3246 requested_limit,
3247 effective_limit: limit,
3248 limit_capped,
3249 },
3250 hits,
3251 })
3252 }
3253
3254 async fn search_conversations(
3255 &self,
3256 query: ConversationSearchQuery,
3257 ) -> RepoResult<ConversationSearchResults> {
3258 let query_text = query.query.trim();
3259 if query_text.is_empty() {
3260 return Err(RepoError::invalid_argument("query cannot be empty"));
3261 }
3262
3263 Self::validate_time_bounds(query.from_unix_ms, query.to_unix_ms)?;
3264
3265 let query_id = Uuid::new_v4().to_string();
3266 let started = Instant::now();
3267
3268 let terms_with_qf = tokenize_query(query_text, self.cfg.bm25_max_query_terms);
3269 if terms_with_qf.is_empty() {
3270 return Err(RepoError::invalid_argument(
3271 "query has no searchable terms (tokens shorter than 2 characters are excluded)",
3272 ));
3273 }
3274 let terms: Vec<String> = terms_with_qf.iter().map(|(term, _)| term.clone()).collect();
3275
3276 let requested_limit = query.limit.unwrap_or(self.cfg.max_results).max(1);
3277 let limit = requested_limit.min(self.cfg.max_results);
3278 let limit_capped = requested_limit > limit;
3279
3280 let min_should_match = query
3281 .min_should_match
3282 .unwrap_or(self.cfg.bm25_default_min_should_match)
3283 .max(1)
3284 .min(terms.len() as u16);
3285
3286 let min_score = query.min_score.unwrap_or(self.cfg.bm25_default_min_score);
3287 let include_tool_events = query
3288 .include_tool_events
3289 .unwrap_or(self.cfg.default_include_tool_events);
3290 let exclude_codex_mcp = query
3291 .exclude_codex_mcp
3292 .unwrap_or(self.cfg.default_exclude_codex_mcp);
3293
3294 let (docs, total_doc_len) = self.corpus_stats().await?;
3295 if docs == 0 {
3296 return Ok(ConversationSearchResults {
3297 query_id,
3298 query: query_text.to_string(),
3299 terms,
3300 stats: ConversationSearchStats {
3301 docs: 0,
3302 avgdl: 0.0,
3303 took_ms: started.elapsed().as_millis() as u32,
3304 result_count: 0,
3305 requested_limit,
3306 effective_limit: limit,
3307 limit_capped,
3308 },
3309 hits: Vec::new(),
3310 });
3311 }
3312
3313 let avgdl = (total_doc_len as f64 / docs as f64).max(1.0);
3314 let df_map = self.df_map(&terms).await?;
3315
3316 let mut idf_by_term = HashMap::<String, f64>::new();
3317 for term in &terms {
3318 let df = *df_map.get(term).unwrap_or(&0);
3319 let idf = if df == 0 {
3320 (1.0 + ((docs as f64 + 0.5) / 0.5)).ln()
3321 } else {
3322 let n = docs.max(df) as f64;
3323 (1.0 + ((n - df as f64 + 0.5) / (df as f64 + 0.5))).ln()
3324 };
3325 idf_by_term.insert(term.clone(), idf.max(0.0));
3326 }
3327
3328 let candidate_set = match self
3329 .fetch_conversation_candidates(
3330 &terms,
3331 &idf_by_term,
3332 include_tool_events,
3333 exclude_codex_mcp,
3334 min_should_match,
3335 limit,
3336 query.from_unix_ms,
3337 query.to_unix_ms,
3338 query.mode,
3339 )
3340 .await
3341 {
3342 Ok(set) => set,
3343 Err(err) => {
3344 warn!("search_conversations candidate stage failed; falling back to exact path: {err}");
3345 ConversationCandidateSet::default()
3346 }
3347 };
3348 let candidate_limit = Self::conversation_candidate_limit(limit);
3349 let candidate_session_ids = if candidate_set.truncated
3350 || candidate_set.rows.is_empty()
3351 || candidate_set.rows.len() >= candidate_limit
3352 {
3353 None
3354 } else {
3355 Some(
3356 candidate_set
3357 .rows
3358 .into_iter()
3359 .map(|row| row.session_id)
3360 .collect::<Vec<_>>(),
3361 )
3362 };
3363
3364 let sql = self.build_search_conversations_sql(
3365 &terms,
3366 &idf_by_term,
3367 avgdl,
3368 include_tool_events,
3369 exclude_codex_mcp,
3370 min_should_match,
3371 min_score,
3372 limit,
3373 query.from_unix_ms,
3374 query.to_unix_ms,
3375 query.mode,
3376 candidate_session_ids.as_deref(),
3377 )?;
3378
3379 let rows: Vec<ConversationSearchRow> =
3380 self.map_backend(self.ch.query_rows(&sql, None).await)?;
3381 let best_event_uids = rows
3382 .iter()
3383 .filter_map(|row| {
3384 if row.best_event_uid.is_empty() {
3385 None
3386 } else {
3387 Some(row.best_event_uid.clone())
3388 }
3389 })
3390 .collect::<Vec<_>>();
3391 let snippet_by_event_uid = self.fetch_conversation_snippets(&best_event_uids).await?;
3392 let session_ids = rows
3393 .iter()
3394 .map(|row| row.session_id.clone())
3395 .collect::<Vec<_>>();
3396 let session_metadata_by_session_id = self
3397 .fetch_conversation_session_metadata(&session_ids)
3398 .await?;
3399
3400 let hits = rows
3401 .into_iter()
3402 .enumerate()
3403 .map(|(idx, row)| {
3404 let ConversationSearchRow {
3405 session_id,
3406 first_event_time,
3407 first_event_unix_ms,
3408 last_event_time,
3409 last_event_unix_ms,
3410 provider: row_provider,
3411 score,
3412 matched_terms,
3413 event_count_considered,
3414 best_event_uid: row_best_event_uid,
3415 snippet: row_snippet,
3416 } = row;
3417 let session_metadata = session_metadata_by_session_id.get(&session_id);
3418
3419 let best_event_uid = if row_best_event_uid.is_empty() {
3420 None
3421 } else {
3422 Some(row_best_event_uid)
3423 };
3424 let snippet = best_event_uid
3425 .as_ref()
3426 .and_then(|event_uid| snippet_by_event_uid.get(event_uid).cloned())
3427 .or((!row_snippet.is_empty()).then_some(row_snippet));
3428 let has_first_event_time = !first_event_time.is_empty();
3429 let has_last_event_time = !last_event_time.is_empty();
3430 let provider = session_metadata
3431 .and_then(|meta| (!meta.provider.is_empty()).then(|| meta.provider.clone()))
3432 .or((!row_provider.is_empty()).then_some(row_provider));
3433 let session_slug = session_metadata.and_then(|meta| {
3434 (!meta.session_slug.is_empty()).then(|| meta.session_slug.clone())
3435 });
3436 let session_summary = session_metadata.and_then(|meta| {
3437 (!meta.session_summary.is_empty()).then(|| meta.session_summary.clone())
3438 });
3439 ConversationSearchHit {
3440 rank: idx + 1,
3441 session_id,
3442 first_event_time: has_first_event_time.then_some(first_event_time),
3443 first_event_unix_ms: has_first_event_time.then_some(first_event_unix_ms),
3444 last_event_time: has_last_event_time.then_some(last_event_time),
3445 last_event_unix_ms: has_last_event_time.then_some(last_event_unix_ms),
3446 provider,
3447 session_slug,
3448 session_summary,
3449 score,
3450 matched_terms,
3451 event_count_considered,
3452 best_event_uid,
3453 snippet,
3454 }
3455 })
3456 .collect::<Vec<_>>();
3457
3458 Ok(ConversationSearchResults {
3459 query_id,
3460 query: query_text.to_string(),
3461 terms,
3462 stats: ConversationSearchStats {
3463 docs,
3464 avgdl,
3465 took_ms: started.elapsed().as_millis() as u32,
3466 result_count: hits.len(),
3467 requested_limit,
3468 effective_limit: limit,
3469 limit_capped,
3470 },
3471 hits,
3472 })
3473 }
3474 }
3475
3476 fn token_re() -> &'static Regex {
3477 static TOKEN_RE: OnceLock<Regex> = OnceLock::new();
3478 TOKEN_RE.get_or_init(|| Regex::new(r"[A-Za-z0-9_]+").expect("valid token regex"))
3479 }
3480
3481 fn safe_value_re() -> &'static Regex {
3482 static SAFE_RE: OnceLock<Regex> = OnceLock::new();
3483 SAFE_RE
3484 .get_or_init(|| Regex::new(r"^[A-Za-z0-9._:@/-]{1,256}$").expect("valid safe-value regex"))
3485 }
3486
3487 fn tokenize_query(text: &str, max_terms: usize) -> Vec<(String, u32)> {
3488 let mut order = Vec::<String>::new();
3489 let mut tf = HashMap::<String, u32>::new();
3490
3491 for mat in token_re().find_iter(text) {
3492 let token = mat.as_str().to_ascii_lowercase();
3493 if token.len() < 2 || token.len() > 64 {
3494 continue;
3495 }
3496
3497 if !tf.contains_key(&token) {
3498 order.push(token.clone());
3499 }
3500 let entry = tf.entry(token).or_insert(0);
3501 *entry += 1;
3502
3503 if order.len() >= max_terms {
3504 break;
3505 }
3506 }
3507
3508 order
3509 .into_iter()
3510 .map(|token| {
3511 let count = *tf.get(&token).unwrap_or(&1);
3512 (token, count)
3513 })
3514 .collect()
3515 }
3516
3517 fn is_safe_filter_value(value: &str) -> bool {
3518 safe_value_re().is_match(value)
3519 }
3520
3521 fn sql_quote(value: &str) -> String {
3522 format!("'{}'", value.replace('\\', "\\\\").replace('\'', "''"))
3523 }
3524
3525 fn sql_identifier(value: &str) -> String {
3526 format!("`{}`", value.replace('`', "``"))
3527 }
3528
3529 fn sql_array_strings(items: &[String]) -> String {
3530 let parts = items.iter().map(|item| sql_quote(item)).collect::<Vec<_>>();
3531 format!("[{}]", parts.join(","))
3532 }
3533
3534 fn sql_array_f64(items: &[f64]) -> String {
3535 let parts = items
3536 .iter()
3537 .map(|v| format!("{:.12}", v))
3538 .collect::<Vec<_>>();
3539 format!("[{}]", parts.join(","))
3540 }
3541
3542 #[cfg(test)]
3543 mod tests {
3544 use super::*;
3545
3546 fn sample_search_doc() -> SearchDocExtraCacheEntry {
3547 SearchDocExtraCacheEntry {
3548 session_id: "session-1".to_string(),
3549 source_name: "source".to_string(),
3550 provider: "provider".to_string(),
3551 event_class: "message".to_string(),
3552 payload_type: "message".to_string(),
3553 actor_role: "assistant".to_string(),
3554 name: "tool".to_string(),
3555 phase: "".to_string(),
3556 source_ref: "source-ref".to_string(),
3557 doc_len: 42,
3558 text_preview: "preview".to_string(),
3559 has_codex_mcp: 0,
3560 fetched_at: Instant::now(),
3561 }
3562 }
3563
3564 fn sample_search_row(
3565 event_uid: &str,
3566 session_id: &str,
3567 event_class: &str,
3568 payload_type: &str,
3569 actor_role: &str,
3570 text_preview: &str,
3571 score: f64,
3572 matched_terms: u64,
3573 ) -> SearchRow {
3574 SearchRow {
3575 event_uid: event_uid.to_string(),
3576 session_id: session_id.to_string(),
3577 source_name: "source".to_string(),
3578 provider: "provider".to_string(),
3579 event_class: event_class.to_string(),
3580 payload_type: payload_type.to_string(),
3581 actor_role: actor_role.to_string(),
3582 name: String::new(),
3583 phase: String::new(),
3584 source_ref: "source-ref".to_string(),
3585 doc_len: 42,
3586 text_preview: text_preview.to_string(),
3587 score,
3588 matched_terms,
3589 }
3590 }
3591
3592 #[test]
3593 fn tokenize_query_enforces_limits_and_counts() {
3594 let terms = tokenize_query("Hello hello world tool_use", 3);
3595 assert_eq!(terms.len(), 3);
3596 assert_eq!(terms[0], ("hello".to_string(), 2));
3597 assert_eq!(terms[1].0, "world");
3598 }
3599
3600 #[test]
3601 fn safe_filter_value_validation() {
3602 assert!(is_safe_filter_value("session_123"));
3603 assert!(is_safe_filter_value("a/b.c:d@e-1"));
3604 assert!(!is_safe_filter_value("drop table;"));
3605 }
3606
3607 #[test]
3608 fn sql_array_builders_escape_values() {
3609 let values = vec!["a".to_string(), "b'c".to_string()];
3610 let out = sql_array_strings(&values);
3611 assert!(out.contains("'a'"));
3612 assert!(out.contains("'b''c'"));
3613 }
3614
3615 #[test]
3616 fn search_doc_filters_exclude_codex_by_flag() {
3617 let mut row = sample_search_doc();
3618 row.has_codex_mcp = 1;
3619 assert!(
3620 !ClickHouseConversationRepository::passes_search_doc_filters(
3621 &row, false, None, true, None
3622 )
3623 );
3624 }
3625
3626 #[test]
3627 fn search_doc_filters_exclude_codex_by_tool_name() {
3628 let mut row = sample_search_doc();
3629 row.name = "search".to_string();
3630 assert!(
3631 !ClickHouseConversationRepository::passes_search_doc_filters(
3632 &row, false, None, true, None
3633 )
3634 );
3635 }
3636
3637 #[test]
3638 fn search_doc_filters_event_kinds_override_include_tool_toggle() {
3639 let mut row = sample_search_doc();
3640 row.event_class = "tool_result".to_string();
3641 row.payload_type = "tool_result".to_string();
3642
3643 assert!(ClickHouseConversationRepository::passes_search_doc_filters(
3644 &row,
3645 false,
3646 Some(&[SearchEventKind::ToolResult]),
3647 false,
3648 None
3649 ));
3650 assert!(
3651 !ClickHouseConversationRepository::passes_search_doc_filters(
3652 &row,
3653 true,
3654 Some(&[SearchEventKind::Message]),
3655 false,
3656 None
3657 )
3658 );
3659 }
3660
3661 #[test]
3662 fn search_doc_filters_map_event_msg_reasoning() {
3663 let mut row = sample_search_doc();
3664 row.event_class = "event_msg".to_string();
3665 row.payload_type = "agent_reasoning".to_string();
3666
3667 assert!(ClickHouseConversationRepository::passes_search_doc_filters(
3668 &row,
3669 true,
3670 Some(&[SearchEventKind::Reasoning]),
3671 false,
3672 None
3673 ));
3674 assert!(
3675 !ClickHouseConversationRepository::passes_search_doc_filters(
3676 &row,
3677 true,
3678 Some(&[SearchEventKind::Message]),
3679 false,
3680 None
3681 )
3682 );
3683 }
3684
3685 #[test]
3686 fn normalize_event_kinds_rejects_empty_lists() {
3687 let result = ClickHouseConversationRepository::normalize_event_kinds(Some(vec![]));
3688 assert!(result.is_err());
3689 }
3690
3691 #[test]
3692 fn normalize_event_kinds_sorts_and_deduplicates() {
3693 let normalized = ClickHouseConversationRepository::normalize_event_kinds(Some(vec![
3694 SearchEventKind::ToolResult,
3695 SearchEventKind::Message,
3696 SearchEventKind::ToolResult,
3697 ]))
3698 .expect("normalize should succeed")
3699 .expect("normalized kinds should be present");
3700
3701 assert_eq!(
3702 normalized,
3703 vec![SearchEventKind::Message, SearchEventKind::ToolResult]
3704 );
3705 }
3706
3707 #[test]
3708 fn dedupe_search_rows_prefers_message_over_event_msg_mirror() {
3709 let rows = vec![
3710 sample_search_row(
3711 "uid-event-msg",
3712 "sess-a",
3713 "event_msg",
3714 "agent_message",
3715 "assistant",
3716 "Short answer: no",
3717 18.26,
3718 3,
3719 ),
3720 sample_search_row(
3721 "uid-message",
3722 "sess-a",
3723 "message",
3724 "message",
3725 "assistant",
3726 "Short answer:\nno",
3727 18.26,
3728 3,
3729 ),
3730 ];
3731
3732 let deduped = ClickHouseConversationRepository::dedupe_search_rows(rows, 5);
3733 assert_eq!(deduped.len(), 1);
3734 assert_eq!(deduped[0].event_uid, "uid-message");
3735 assert_eq!(deduped[0].event_class, "message");
3736 }
3737
3738 #[test]
3739 fn dedupe_search_rows_fills_limit_after_collapsing_mirrors() {
3740 let rows = vec![
3741 sample_search_row(
3742 "uid-event-msg",
3743 "sess-a",
3744 "event_msg",
3745 "agent_message",
3746 "assistant",
3747 "same answer",
3748 18.26,
3749 3,
3750 ),
3751 sample_search_row(
3752 "uid-message",
3753 "sess-a",
3754 "message",
3755 "message",
3756 "assistant",
3757 "same answer",
3758 18.26,
3759 3,
3760 ),
3761 sample_search_row(
3762 "uid-2",
3763 "sess-b",
3764 "message",
3765 "message",
3766 "assistant",
3767 "different answer 2",
3768 17.00,
3769 2,
3770 ),
3771 sample_search_row(
3772 "uid-3",
3773 "sess-c",
3774 "message",
3775 "message",
3776 "assistant",
3777 "different answer 3",
3778 16.00,
3779 2,
3780 ),
3781 ];
3782
3783 let deduped = ClickHouseConversationRepository::dedupe_search_rows(rows, 3);
3784 assert_eq!(deduped.len(), 3);
3785 assert_eq!(deduped[0].event_uid, "uid-message");
3786 assert_eq!(deduped[1].event_uid, "uid-2");
3787 assert_eq!(deduped[2].event_uid, "uid-3");
3788 }
3789
3790 #[test]
3791 fn dedupe_search_rows_does_not_collapse_same_kind_hits() {
3792 let rows = vec![
3793 sample_search_row(
3794 "uid-1",
3795 "sess-a",
3796 "message",
3797 "message",
3798 "assistant",
3799 "same text",
3800 10.0,
3801 2,
3802 ),
3803 sample_search_row(
3804 "uid-2",
3805 "sess-a",
3806 "message",
3807 "message",
3808 "assistant",
3809 "same text",
3810 10.0,
3811 2,
3812 ),
3813 ];
3814
3815 let deduped = ClickHouseConversationRepository::dedupe_search_rows(rows, 5);
3816 assert_eq!(deduped.len(), 2);
3817 }
3818
3819 #[test]
3820 fn dedupe_search_rows_prefers_reasoning_over_event_msg_reasoning_mirror() {
3821 let rows = vec![
3822 sample_search_row(
3823 "uid-event-msg-reasoning",
3824 "sess-a",
3825 "event_msg",
3826 "agent_reasoning",
3827 "assistant",
3828 "Let me think about this",
3829 12.50,
3830 2,
3831 ),
3832 sample_search_row(
3833 "uid-reasoning",
3834 "sess-a",
3835 "reasoning",
3836 "reasoning",
3837 "assistant",
3838 "Let me think about this",
3839 12.50,
3840 2,
3841 ),
3842 ];
3843
3844 let deduped = ClickHouseConversationRepository::dedupe_search_rows(rows, 5);
3845 assert_eq!(deduped.len(), 1);
3846 assert_eq!(deduped[0].event_uid, "uid-reasoning");
3847 assert_eq!(deduped[0].event_class, "reasoning");
3848 }
3849
3850 #[test]
3851 fn dedupe_search_rows_reasoning_mirrors_do_not_collapse_with_messages() {
3852 let rows = vec![
3853 sample_search_row(
3854 "uid-reasoning",
3855 "sess-a",
3856 "reasoning",
3857 "reasoning",
3858 "assistant",
3859 "same text",
3860 10.0,
3861 2,
3862 ),
3863 sample_search_row(
3864 "uid-message",
3865 "sess-a",
3866 "message",
3867 "message",
3868 "assistant",
3869 "same text",
3870 10.0,
3871 2,
3872 ),
3873 ];
3874
3875 let deduped = ClickHouseConversationRepository::dedupe_search_rows(rows, 5);
3876 assert_eq!(deduped.len(), 2);
3877 }
3878
3879 #[test]
3880 fn low_information_system_event_classifier_targets_open_noise() {
3881 assert!(
3882 ClickHouseConversationRepository::is_low_information_system_event("system", "progress")
3883 );
3884 assert!(
3885 ClickHouseConversationRepository::is_low_information_system_event(
3886 "SYSTEM",
3887 "file_history_snapshot"
3888 )
3889 );
3890 assert!(
3891 ClickHouseConversationRepository::is_low_information_system_event("system", "system")
3892 );
3893 assert!(
3894 !ClickHouseConversationRepository::is_low_information_system_event(
3895 "assistant",
3896 "progress"
3897 )
3898 );
3899 assert!(
3900 !ClickHouseConversationRepository::is_low_information_system_event(
3901 "system",
3902 "reasoning"
3903 )
3904 );
3905 }
3906
3907 #[test]
3908 fn open_context_filter_clause_respects_include_system_events_flag() {
3909 assert_eq!(
3910 ClickHouseConversationRepository::open_context_filter_clause(true),
3911 ""
3912 );
3913 let filtered_clause = ClickHouseConversationRepository::open_context_filter_clause(false);
3914 assert!(filtered_clause.contains("progress"));
3915 assert!(filtered_clause.contains("file_history_snapshot"));
3916 assert!(filtered_clause.contains("lowerUTF8(actor_role) = 'system'"));
3917 }
3918 }