Skip to content

crates/moraine-conversations/src/clickhouse_repo.rs


     1 use std::sync::{Arc, OnceLock};
     2 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
     3  
     4 use ahash::AHashMap as HashMap;
     5 use anyhow::Result as AnyResult;
     6 use async_trait::async_trait;
     7 use moraine_clickhouse::ClickHouseClient;
     8 use regex::Regex;
     9 use serde::Deserialize;
    10 use serde_json::{json, Value};
    11 use tokio::sync::RwLock;
    12 use tracing::warn;
    13 use uuid::Uuid;
    14  
    15 use crate::cursor::{decode_cursor, encode_cursor, ConversationCursor, TurnCursor};
    16 use crate::domain::{
    17     Conversation, ConversationDetailOptions, ConversationListFilter, ConversationMode,
    18     ConversationSearchHit, ConversationSearchQuery, ConversationSearchResults,
    19     ConversationSearchStats, ConversationSummary, OpenContext, OpenEvent, OpenEventRequest, Page,
    20     PageRequest, RepoConfig, SearchEventHit, SearchEventKind, SearchEventsQuery,
    21     SearchEventsResult, SearchEventsStats, SearchEventsStrategy, TraceEvent, Turn, TurnListFilter,
    22     TurnSummary,
    23 };
    24 use crate::error::{RepoError, RepoResult};
    25 use crate::repo::ConversationRepository;
    26  
    27 #[derive(Clone)]
    28 pub struct ClickHouseConversationRepository {
    29     ch: ClickHouseClient,
    30     cfg: RepoConfig,
    31     stats_cache: Arc<RwLock<SearchStatsCache>>,
    32     search_cache: Arc<RwLock<HashMap<String, SearchEventsCacheEntry>>>,
    33     term_postings_cache: Arc<RwLock<HashMap<String, TermPostingsCacheEntry>>>,
    34     search_doc_extra_cache: Arc<RwLock<HashMap<String, SearchDocExtraCacheEntry>>>,
    35 }
    36  
    37 const BENCHMARK_REPLAY_SOURCE: &str = "benchmark-replay";
    38 const CORPUS_STATS_CACHE_TTL: Duration = Duration::from_secs(30);
    39 const TERM_DF_CACHE_TTL: Duration = Duration::from_secs(300);
    40 const SEARCH_SCHEMA_CACHE_TTL: Duration = Duration::from_secs(60);
    41 const SEARCH_RESULT_CACHE_TTL: Duration = Duration::from_secs(15);
    42 const SEARCH_RESULT_CACHE_MAX_ENTRIES: usize = 256;
    43 const TERM_POSTINGS_CACHE_TTL: Duration = Duration::from_secs(15);
    44 const TERM_POSTINGS_CACHE_MAX_ENTRIES: usize = 2048;
    45 const TERM_POSTINGS_CACHE_MAX_ROWS_PER_TERM: usize = 131_072;
    46 const TERM_POSTINGS_CACHE_MAX_ROWS_TOTAL: usize = 262_144;
    47 const SEARCH_DOC_EXTRA_CACHE_TTL: Duration = Duration::from_secs(15);
    48 const SEARCH_DOC_EXTRA_CACHE_MAX_ENTRIES: usize = 65536;
    49 const CONVERSATION_CANDIDATE_MIN: usize = 512;
    50 const CONVERSATION_CANDIDATE_MULTIPLIER: usize = 80;
    51 const CONVERSATION_CANDIDATE_MAX: usize = 20_000;
    52 const CONVERSATION_RECENT_WINDOW_MS: i64 = 45_000;
    53 const CONVERSATION_RECENT_CANDIDATE_LIMIT: usize = 1024;
    54  
    55 #[derive(Debug, Clone, Deserialize)]
    56 struct ConversationSummaryRow {
    57     session_id: String,
    58     first_event_time: String,
    59     first_event_unix_ms: i64,
    60     last_event_time: String,
    61     last_event_unix_ms: i64,
    62     total_turns: u32,
    63     total_events: u64,
    64     user_messages: u64,
    65     assistant_messages: u64,
    66     tool_calls: u64,
    67     tool_results: u64,
    68     mode: String,
    69 }
    70  
    71 #[derive(Debug, Clone, Deserialize)]
    72 struct TurnSummaryRow {
    73     session_id: String,
    74     turn_seq: u32,
    75     turn_id: String,
    76     started_at: String,
    77     started_at_unix_ms: i64,
    78     ended_at: String,
    79     ended_at_unix_ms: i64,
    80     total_events: u64,
    81     user_messages: u64,
    82     assistant_messages: u64,
    83     tool_calls: u64,
    84     tool_results: u64,
    85     reasoning_items: u64,
    86 }
    87  
    88 #[derive(Debug, Deserialize)]
    89 struct TraceEventRow {
    90     session_id: String,
    91     event_uid: String,
    92     event_order: u64,
    93     turn_seq: u32,
    94     event_time: String,
    95     actor_role: String,
    96     event_class: String,
    97     payload_type: String,
    98     call_id: String,
    99     name: String,
   100     phase: String,
   101     item_id: String,
   102     source_ref: String,
   103     text_content: String,
   104     payload_json: String,
   105     token_usage_json: String,
   106 }
   107  
   108 #[derive(Debug, Deserialize)]
   109 struct OpenTargetRow {
   110     session_id: String,
   111     event_order: u64,
   112     turn_seq: u32,
   113 }
   114  
   115 #[derive(Debug, Clone, Deserialize)]
   116 struct SearchRow {
   117     event_uid: String,
   118     session_id: String,
   119     source_name: String,
   120     provider: String,
   121     event_class: String,
   122     payload_type: String,
   123     actor_role: String,
   124     name: String,
   125     phase: String,
   126     source_ref: String,
   127     doc_len: u32,
   128     text_preview: String,
   129     score: f64,
   130     matched_terms: u64,
   131 }
   132  
   133 #[derive(Debug, Clone, Deserialize)]
   134 struct CachedPostingRow {
   135     event_uid: String,
   136     doc_len: u32,
   137     tf: u16,
   138 }
   139  
   140 #[derive(Debug, Clone, Deserialize)]
   141 struct FetchedPostingRow {
   142     term: String,
   143     event_uid: String,
   144     doc_len: u32,
   145     tf: u16,
   146 }
   147  
   148 #[derive(Debug, Clone, Deserialize)]
   149 struct SearchDocExtraRow {
   150     event_uid: String,
   151     session_id: String,
   152     source_name: String,
   153     provider: String,
   154     event_class: String,
   155     payload_type: String,
   156     actor_role: String,
   157     name: String,
   158     phase: String,
   159     source_ref: String,
   160     doc_len: u32,
   161     text_preview: String,
   162     has_codex_mcp: u8,
   163 }
   164  
   165 #[derive(Debug, Deserialize)]
   166 struct CorpusStatsRow {
   167     docs: u64,
   168     total_doc_len: u64,
   169 }
   170  
   171 #[derive(Debug, Deserialize)]
   172 struct DfRow {
   173     term: String,
   174     df: u64,
   175 }
   176  
   177 #[derive(Debug, Deserialize)]
   178 struct HotQueryRow {
   179     raw_query: String,
   180 }
   181  
   182 #[derive(Debug, Deserialize)]
   183 struct ConversationSearchRow {
   184     session_id: String,
   185     #[serde(default)]
   186     first_event_time: String,
   187     #[serde(default)]
   188     first_event_unix_ms: i64,
   189     #[serde(default)]
   190     last_event_time: String,
   191     #[serde(default)]
   192     last_event_unix_ms: i64,
   193     #[serde(default)]
   194     provider: String,
   195     score: f64,
   196     matched_terms: u16,
   197     event_count_considered: u32,
   198     best_event_uid: String,
   199     #[serde(default)]
   200     snippet: String,
   201 }
   202  
   203 #[derive(Debug, Deserialize)]
   204 struct ConversationSessionMetadataRow {
   205     session_id: String,
   206     #[serde(default)]
   207     provider: String,
   208     #[serde(default)]
   209     session_slug: String,
   210     #[serde(default)]
   211     session_summary: String,
   212 }
   213  
   214 #[derive(Debug, Deserialize)]
   215 struct ConversationSnippetRow {
   216     event_uid: String,
   217     snippet: String,
   218 }
   219  
   220 #[derive(Debug, Deserialize)]
   221 struct SessionTimeBoundsRow {
   222     session_id: String,
   223     first_event_time: String,
   224     last_event_time: String,
   225 }
   226  
   227 #[derive(Debug, Deserialize)]
   228 struct ConversationCandidateRow {
   229     session_id: String,
   230     score: f64,
   231     matched_terms: u16,
   232 }
   233  
   234 #[derive(Debug, Default)]
   235 struct ConversationCandidateSet {
   236     rows: Vec<ConversationCandidateRow>,
   237     truncated: bool,
   238 }
   239  
   240 #[derive(Debug, Clone, Default)]
   241 struct SessionTimeBounds {
   242     first_event_time: String,
   243     last_event_time: String,
   244 }
   245  
   246 #[derive(Debug, Deserialize)]
   247 struct ColumnExistsRow {
   248     exists: u8,
   249 }
   250  
   251 #[derive(Debug, Clone)]
   252 struct CorpusStatsCacheEntry {
   253     docs: u64,
   254     total_doc_len: u64,
   255     fetched_at: Instant,
   256 }
   257  
   258 #[derive(Debug, Clone)]
   259 struct TermDfCacheEntry {
   260     df: u64,
   261     fetched_at: Instant,
   262 }
   263  
   264 #[derive(Debug, Default)]
   265 struct SearchStatsCache {
   266     corpus_stats: Option<CorpusStatsCacheEntry>,
   267     term_df_by_term: HashMap<String, TermDfCacheEntry>,
   268     has_codex_flag_column: Option<(bool, Instant)>,
   269 }
   270  
   271 #[derive(Debug, Clone)]
   272 struct SearchEventsCacheEntry {
   273     hits: Vec<SearchEventHit>,
   274     fetched_at: Instant,
   275 }
   276  
   277 #[derive(Debug, Clone)]
   278 struct TermPostingsCacheEntry {
   279     rows: Arc<[CachedPostingRow]>,
   280     fetched_at: Instant,
   281 }
   282  
   283 #[derive(Debug, Clone)]
   284 struct SearchDocExtraCacheEntry {
   285     session_id: String,
   286     source_name: String,
   287     provider: String,
   288     event_class: String,
   289     payload_type: String,
   290     actor_role: String,
   291     name: String,
   292     phase: String,
   293     source_ref: String,
   294     doc_len: u32,
   295     text_preview: String,
   296     has_codex_mcp: u8,
   297     fetched_at: Instant,
   298 }
   299  
   300 #[derive(Debug, Clone, Copy)]
   301 struct SearchScoreAccum<'a> {
   302     row: &'a CachedPostingRow,
   303     score: f64,
   304     matched_mask: u64,
   305 }
   306  
   307 impl ClickHouseConversationRepository {
   308     pub fn new(ch: ClickHouseClient, cfg: RepoConfig) -> Self {
   309         Self {
   310             ch,
   311             cfg,
   312             stats_cache: Arc::new(RwLock::new(SearchStatsCache::default())),
   313             search_cache: Arc::new(RwLock::new(HashMap::new())),
   314             term_postings_cache: Arc::new(RwLock::new(HashMap::new())),
   315             search_doc_extra_cache: Arc::new(RwLock::new(HashMap::new())),
   316         }
   317     }
   318  
   319     pub fn config(&self) -> &RepoConfig {
   320         &self.cfg
   321     }
   322  
   323     async fn run_mcp_search_prewarm_queries(
   324         &self,
   325         queries: impl IntoIterator<Item = String>,
   326         limit: u16,
   327     ) {
   328         for query in queries {
   329             if query.trim().is_empty() {
   330                 continue;
   331             }
   332             if let Err(err) = self
   333                 .search_events(SearchEventsQuery {
   334                     query,
   335                     source: Some(BENCHMARK_REPLAY_SOURCE.to_string()),
   336                     limit: Some(limit),
   337                     session_id: None,
   338                     min_score: None,
   339                     min_should_match: None,
   340                     include_tool_events: None,
   341                     event_kinds: None,
   342                     exclude_codex_mcp: None,
   343                     disable_cache: Some(false),
   344                     search_strategy: Some(SearchEventsStrategy::Optimized),
   345                 })
   346                 .await
   347             {
   348                 warn!("mcp prewarm query failed: {}", err);
   349             }
   350         }
   351     }
   352  
   353     pub async fn prewarm_mcp_search_state_quick(&self) -> RepoResult<()> {
   354         // Keep synchronous initialize prewarm deterministic and bounded.
   355         // Variable hot-query prewarm stays in the async background path.
   356         const PREWARM_QUERY: &str = "the";
   357         const PREWARM_LIMITS: [u16; 2] = [1, 25];
   358  
   359         for limit in PREWARM_LIMITS {
   360             if let Err(err) = self
   361                 .search_events(SearchEventsQuery {
   362                     query: PREWARM_QUERY.to_string(),
   363                     source: Some(BENCHMARK_REPLAY_SOURCE.to_string()),
   364                     limit: Some(limit),
   365                     session_id: None,
   366                     min_score: None,
   367                     min_should_match: None,
   368                     include_tool_events: None,
   369                     event_kinds: None,
   370                     exclude_codex_mcp: None,
   371                     disable_cache: Some(false),
   372                     search_strategy: Some(SearchEventsStrategy::Optimized),
   373                 })
   374                 .await
   375             {
   376                 warn!("mcp quick prewarm query failed: {}", err);
   377             }
   378         }
   379  
   380         Ok(())
   381     }
   382  
   383     pub async fn prewarm_mcp_search_state(&self) -> RepoResult<()> {
   384         const PREWARM_QUERY_LIMIT: u16 = 10;
   385         const PREWARM_HOT_QUERY_COUNT: usize = 6;
   386         const PREWARM_FALLBACK_QUERIES: [&str; 5] = [
   387             "the",
   388             "error",
   389             "test",
   390             "file directory path config",
   391             "function code implementation",
   392         ];
   393  
   394         let mut queries = self
   395             .load_hot_queries_for_prewarm(PREWARM_HOT_QUERY_COUNT)
   396             .await?;
   397         for fallback in PREWARM_FALLBACK_QUERIES {
   398             if !queries.iter().any(|existing| existing == fallback) {
   399                 queries.push(fallback.to_string());
   400             }
   401         }
   402  
   403         self.run_mcp_search_prewarm_queries(queries, PREWARM_QUERY_LIMIT)
   404             .await;
   405  
   406         Ok(())
   407     }
   408  
   409     fn table_ref(&self, table: &str) -> String {
   410         format!(
   411             "{}.{}",
   412             sql_identifier(&self.ch.config().database),
   413             sql_identifier(table)
   414         )
   415     }
   416  
   417     fn map_backend<T>(&self, result: AnyResult<T>) -> RepoResult<T> {
   418         result.map_err(|err| RepoError::backend(err.to_string()))
   419     }
   420  
   421     fn search_events_cache_key(
   422         terms: &[String],
   423         search_strategy: SearchEventsStrategy,
   424         include_tool_events: bool,
   425         event_kinds: Option<&[SearchEventKind]>,
   426         exclude_codex_mcp: bool,
   427         session_id: Option<&str>,
   428         min_should_match: u16,
   429         min_score: f64,
   430         limit: u16,
   431     ) -> String {
   432         let mut cache_terms = terms.to_vec();
   433         cache_terms.sort_unstable();
   434         let event_kind_sig = event_kinds
   435             .map(|kinds| {
   436                 kinds
   437                     .iter()
   438                     .map(|kind| kind.as_str())
   439                     .collect::<Vec<_>>()
   440                     .join(",")
   441             })
   442             .unwrap_or_default();
   443         format!(
   444             "strategy={};incl_tools={include_tool_events};event_kinds={event_kind_sig};excl_codex={exclude_codex_mcp};session={};msm={min_should_match};min_score={min_score:.12};limit={limit};terms={}",
   445             search_strategy.as_str(),
   446             session_id.unwrap_or(""),
   447             cache_terms.join(",")
   448         )
   449     }
   450  
   451     async fn search_events_cache_get(&self, key: &str) -> Option<Vec<SearchEventHit>> {
   452         let now = Instant::now();
   453         {
   454             let cache = self.search_cache.read().await;
   455             if let Some(entry) = cache.get(key) {
   456                 if now.duration_since(entry.fetched_at) <= SEARCH_RESULT_CACHE_TTL {
   457                     return Some(entry.hits.clone());
   458                 }
   459             } else {
   460                 return None;
   461             }
   462         }
   463  
   464         let mut cache = self.search_cache.write().await;
   465         if let Some(entry) = cache.get(key) {
   466             if now.duration_since(entry.fetched_at) <= SEARCH_RESULT_CACHE_TTL {
   467                 return Some(entry.hits.clone());
   468             }
   469         }
   470         cache.remove(key);
   471         None
   472     }
   473  
   474     async fn search_events_cache_put(&self, key: String, hits: &[SearchEventHit]) {
   475         let now = Instant::now();
   476         let mut cache = self.search_cache.write().await;
   477         cache.retain(|_, entry| now.duration_since(entry.fetched_at) <= SEARCH_RESULT_CACHE_TTL);
   478  
   479         if cache.len() >= SEARCH_RESULT_CACHE_MAX_ENTRIES {
   480             if let Some(oldest_key) = cache
   481                 .iter()
   482                 .min_by_key(|(_, entry)| entry.fetched_at)
   483                 .map(|(k, _)| k.clone())
   484             {
   485                 cache.remove(&oldest_key);
   486             }
   487         }
   488  
   489         cache.insert(
   490             key,
   491             SearchEventsCacheEntry {
   492                 hits: hits.to_vec(),
   493                 fetched_at: now,
   494             },
   495         );
   496     }
   497  
   498     fn mode_subquery(&self) -> String {
   499         let events_table = self.table_ref("events");
   500         format!(
   501             "SELECT
   502   session_id,
   503   multiIf(
   504     countIf(
   505       payload_type = 'web_search_call'
   506       OR payload_type = 'search_results_received'
   507       OR (payload_type = 'tool_use' AND tool_name IN ('WebSearch', 'WebFetch'))
   508     ) > 0,
   509     'web_search',
   510     countIf(source_name = 'codex-mcp' OR lowerUTF8(tool_name) IN ('search', 'open')) > 0,
   511     'mcp_internal',
   512     countIf(event_kind IN ('tool_call', 'tool_result') OR payload_type = 'tool_use') > 0,
   513     'tool_calling',
   514     'chat'
   515   ) AS mode
   516 FROM {events_table}
   517 GROUP BY session_id"
   518         )
   519     }
   520  
   521     fn parse_mode(raw: &str) -> ConversationMode {
   522         match raw {
   523             "web_search" => ConversationMode::WebSearch,
   524             "mcp_internal" => ConversationMode::McpInternal,
   525             "tool_calling" => ConversationMode::ToolCalling,
   526             _ => ConversationMode::Chat,
   527         }
   528     }
   529  
   530     fn conversation_filter_sig(filter: &ConversationListFilter) -> String {
   531         format!(
   532             "from={:?};to={:?};mode={}",
   533             filter.from_unix_ms,
   534             filter.to_unix_ms,
   535             filter
   536                 .mode
   537                 .map(ConversationMode::as_str)
   538                 .unwrap_or("__none__")
   539         )
   540     }
   541  
   542     fn turn_filter_sig(session_id: &str, filter: &TurnListFilter) -> String {
   543         format!(
   544             "session={};from={:?};to={:?}",
   545             session_id, filter.from_turn_seq, filter.to_turn_seq
   546         )
   547     }
   548  
   549     fn validate_time_bounds(from_unix_ms: Option<i64>, to_unix_ms: Option<i64>) -> RepoResult<()> {
   550         if let (Some(from), Some(to)) = (from_unix_ms, to_unix_ms) {
   551             if from >= to {
   552                 return Err(RepoError::invalid_argument(
   553                     "from_unix_ms must be strictly less than to_unix_ms",
   554                 ));
   555             }
   556         }
   557         Ok(())
   558     }
   559  
   560     fn validate_session_id(session_id: &str) -> RepoResult<()> {
   561         if !is_safe_filter_value(session_id) {
   562             return Err(RepoError::invalid_argument(
   563                 "session_id contains unsupported characters",
   564             ));
   565         }
   566         Ok(())
   567     }
   568  
   569     fn validate_event_uid(event_uid: &str) -> RepoResult<()> {
   570         if !is_safe_filter_value(event_uid) {
   571             return Err(RepoError::invalid_argument(
   572                 "event_uid contains unsupported characters",
   573             ));
   574         }
   575         Ok(())
   576     }
   577  
   578     fn normalize_event_kinds(
   579         event_kinds: Option<Vec<SearchEventKind>>,
   580     ) -> RepoResult<Option<Vec<SearchEventKind>>> {
   581         let Some(mut kinds) = event_kinds else {
   582             return Ok(None);
   583         };
   584  
   585         if kinds.is_empty() {
   586             return Err(RepoError::invalid_argument(
   587                 "event_kind filter cannot be an empty list",
   588             ));
   589         }
   590  
   591         kinds.sort_unstable();
   592         kinds.dedup();
   593         Ok(Some(kinds))
   594     }
   595  
   596     fn matches_event_kind(event_class: &str, payload_type: &str, kind: SearchEventKind) -> bool {
   597         match kind {
   598             SearchEventKind::Message => {
   599                 event_class == "message"
   600                     || (event_class == "event_msg"
   601                         && matches!(
   602                             payload_type,
   603                             "user_message" | "agent_message" | "message" | "text"
   604                         ))
   605             }
   606             SearchEventKind::Reasoning => {
   607                 event_class == "reasoning"
   608                     || (event_class == "event_msg" && payload_type == "agent_reasoning")
   609             }
   610             SearchEventKind::ToolCall => {
   611                 event_class == "tool_call"
   612                     || (event_class == "event_msg"
   613                         && matches!(
   614                             payload_type,
   615                             "tool_use" | "function_call" | "custom_tool_call" | "web_search_call"
   616                         ))
   617             }
   618             SearchEventKind::ToolResult => {
   619                 event_class == "tool_result"
   620                     || (event_class == "event_msg"
   621                         && matches!(
   622                             payload_type,
   623                             "tool_result"
   624                                 | "function_call_output"
   625                                 | "custom_tool_call_output"
   626                                 | "search_results_received"
   627                         ))
   628             }
   629         }
   630     }
   631  
   632     fn matches_requested_event_kinds(
   633         event_class: &str,
   634         payload_type: &str,
   635         event_kinds: &[SearchEventKind],
   636     ) -> bool {
   637         event_kinds
   638             .iter()
   639             .any(|kind| Self::matches_event_kind(event_class, payload_type, *kind))
   640     }
   641  
   642     fn single_event_kind_clause(
   643         event_class_expr: &str,
   644         payload_type_expr: &str,
   645         kind: SearchEventKind,
   646     ) -> String {
   647         match kind {
   648             SearchEventKind::Message => format!(
   649                 "({event_class_expr} = 'message' OR ({event_class_expr} = 'event_msg' AND {payload_type_expr} IN ('user_message', 'agent_message', 'message', 'text')))"
   650             ),
   651             SearchEventKind::Reasoning => format!(
   652                 "({event_class_expr} = 'reasoning' OR ({event_class_expr} = 'event_msg' AND {payload_type_expr} = 'agent_reasoning'))"
   653             ),
   654             SearchEventKind::ToolCall => format!(
   655                 "({event_class_expr} = 'tool_call' OR ({event_class_expr} = 'event_msg' AND {payload_type_expr} IN ('tool_use', 'function_call', 'custom_tool_call', 'web_search_call')))"
   656             ),
   657             SearchEventKind::ToolResult => format!(
   658                 "({event_class_expr} = 'tool_result' OR ({event_class_expr} = 'event_msg' AND {payload_type_expr} IN ('tool_result', 'function_call_output', 'custom_tool_call_output', 'search_results_received')))"
   659             ),
   660         }
   661     }
   662  
   663     fn event_kind_filter_clause(
   664         event_class_expr: &str,
   665         payload_type_expr: &str,
   666         event_kinds: &[SearchEventKind],
   667     ) -> String {
   668         let clauses = event_kinds
   669             .iter()
   670             .map(|kind| Self::single_event_kind_clause(event_class_expr, payload_type_expr, *kind))
   671             .collect::<Vec<_>>();
   672         if clauses.len() == 1 {
   673             clauses[0].clone()
   674         } else {
   675             format!("({})", clauses.join(" OR "))
   676         }
   677     }
   678  
   679     fn is_low_information_system_event(actor_role: &str, payload_type: &str) -> bool {
   680         actor_role.eq_ignore_ascii_case("system")
   681             && matches!(
   682                 payload_type.to_ascii_lowercase().as_str(),
   683                 "progress" | "file_history_snapshot" | "system"
   684             )
   685     }
   686  
   687     fn open_context_filter_clause(include_system_events: bool) -> &'static str {
   688         if include_system_events {
   689             ""
   690         } else {
   691             " AND NOT (lowerUTF8(actor_role) = 'system' AND lowerUTF8(payload_type) IN ('progress', 'file_history_snapshot', 'system'))"
   692         }
   693     }
   694  
   695     fn map_conversation_row(row: ConversationSummaryRow) -> ConversationSummary {
   696         ConversationSummary {
   697             session_id: row.session_id,
   698             first_event_time: row.first_event_time,
   699             first_event_unix_ms: row.first_event_unix_ms,
   700             last_event_time: row.last_event_time,
   701             last_event_unix_ms: row.last_event_unix_ms,
   702             total_turns: row.total_turns,
   703             total_events: row.total_events,
   704             user_messages: row.user_messages,
   705             assistant_messages: row.assistant_messages,
   706             tool_calls: row.tool_calls,
   707             tool_results: row.tool_results,
   708             mode: Self::parse_mode(&row.mode),
   709         }
   710     }
   711  
   712     fn map_turn_row(row: TurnSummaryRow) -> TurnSummary {
   713         TurnSummary {
   714             session_id: row.session_id,
   715             turn_seq: row.turn_seq,
   716             turn_id: row.turn_id,
   717             started_at: row.started_at,
   718             started_at_unix_ms: row.started_at_unix_ms,
   719             ended_at: row.ended_at,
   720             ended_at_unix_ms: row.ended_at_unix_ms,
   721             total_events: row.total_events,
   722             user_messages: row.user_messages,
   723             assistant_messages: row.assistant_messages,
   724             tool_calls: row.tool_calls,
   725             tool_results: row.tool_results,
   726             reasoning_items: row.reasoning_items,
   727         }
   728     }
   729  
   730     fn map_trace_event(row: TraceEventRow) -> TraceEvent {
   731         TraceEvent {
   732             session_id: row.session_id,
   733             event_uid: row.event_uid,
   734             event_order: row.event_order,
   735             turn_seq: row.turn_seq,
   736             event_time: row.event_time,
   737             actor_role: row.actor_role,
   738             event_class: row.event_class,
   739             payload_type: row.payload_type,
   740             call_id: row.call_id,
   741             name: row.name,
   742             phase: row.phase,
   743             item_id: row.item_id,
   744             source_ref: row.source_ref,
   745             text_content: row.text_content,
   746             payload_json: row.payload_json,
   747             token_usage_json: row.token_usage_json,
   748         }
   749     }
   750  
   751     fn mode_filter_clause(mode: Option<ConversationMode>) -> Option<String> {
   752         mode.map(|m| format!("ifNull(m.mode, 'chat') = {}", sql_quote(m.as_str())))
   753     }
   754  
   755     async fn load_turns_for_session(&self, session_id: &str) -> RepoResult<Vec<TurnSummary>> {
   756         let turn_summary = self.table_ref("v_turn_summary");
   757         let query = format!(
   758             "SELECT
   759   session_id,
   760   toUInt32(turn_seq) AS turn_seq,
   761   ifNull(turn_id, '') AS turn_id,
   762   toString(started_at) AS started_at,
   763   toInt64(toUnixTimestamp64Milli(started_at)) AS started_at_unix_ms,
   764   toString(ended_at) AS ended_at,
   765   toInt64(toUnixTimestamp64Milli(ended_at)) AS ended_at_unix_ms,
   766   toUInt64(total_events) AS total_events,
   767   toUInt64(user_messages) AS user_messages,
   768   toUInt64(assistant_messages) AS assistant_messages,
   769   toUInt64(tool_calls) AS tool_calls,
   770   toUInt64(tool_results) AS tool_results,
   771   toUInt64(reasoning_items) AS reasoning_items
   772 FROM {turn_summary}
   773 WHERE session_id = {}
   774 ORDER BY turn_seq ASC
   775 FORMAT JSONEachRow",
   776             sql_quote(session_id),
   777         );
   778  
   779         let rows: Vec<TurnSummaryRow> = self.map_backend(self.ch.query_rows(&query, None).await)?;
   780         Ok(rows.into_iter().map(Self::map_turn_row).collect())
   781     }
   782  
   783     async fn load_conversation_summary(
   784         &self,
   785         session_id: &str,
   786     ) -> RepoResult<Option<ConversationSummary>> {
   787         let session_summary = self.table_ref("v_session_summary");
   788         let mode_subquery = self.mode_subquery();
   789         let query = format!(
   790             "SELECT
   791   s.session_id,
   792   toString(s.first_event_time) AS first_event_time,
   793   toInt64(toUnixTimestamp64Milli(s.first_event_time)) AS first_event_unix_ms,
   794   toString(s.last_event_time) AS last_event_time,
   795   toInt64(toUnixTimestamp64Milli(s.last_event_time)) AS last_event_unix_ms,
   796   toUInt32(s.total_turns) AS total_turns,
   797   toUInt64(s.total_events) AS total_events,
   798   toUInt64(s.user_messages) AS user_messages,
   799   toUInt64(s.assistant_messages) AS assistant_messages,
   800   toUInt64(s.tool_calls) AS tool_calls,
   801   toUInt64(s.tool_results) AS tool_results,
   802   ifNull(m.mode, 'chat') AS mode
   803 FROM {session_summary} AS s
   804 LEFT JOIN ({mode_subquery}) AS m ON m.session_id = s.session_id
   805 WHERE s.session_id = {}
   806 LIMIT 1
   807 FORMAT JSONEachRow",
   808             sql_quote(session_id),
   809         );
   810  
   811         let rows: Vec<ConversationSummaryRow> =
   812             self.map_backend(self.ch.query_rows(&query, None).await)?;
   813         Ok(rows.into_iter().next().map(Self::map_conversation_row))
   814     }
   815  
   816     async fn corpus_stats(&self) -> RepoResult<(u64, u64)> {
   817         let now = Instant::now();
   818         {
   819             let cache = self.stats_cache.read().await;
   820             if let Some(entry) = cache.corpus_stats.as_ref() {
   821                 if now.duration_since(entry.fetched_at) <= CORPUS_STATS_CACHE_TTL {
   822                     return Ok((entry.docs, entry.total_doc_len));
   823                 }
   824             }
   825         }
   826  
   827         let from_stats_query = format!(
   828             "SELECT toUInt64(ifNull(sum(docs), 0)) AS docs, toUInt64(ifNull(sum(total_doc_len), 0)) AS total_doc_len FROM {} FORMAT JSONEachRow",
   829             self.table_ref("search_corpus_stats")
   830         );
   831  
   832         let from_stats: Vec<CorpusStatsRow> =
   833             self.map_backend(self.ch.query_rows(&from_stats_query, None).await)?;
   834  
   835         if let Some(row) = from_stats.first() {
   836             if row.docs > 0 {
   837                 self.cache_corpus_stats(row.docs, row.total_doc_len, now)
   838                     .await;
   839                 return Ok((row.docs, row.total_doc_len));
   840             }
   841         }
   842  
   843         let fallback_query = format!(
   844             "SELECT toUInt64(count()) AS docs, toUInt64(ifNull(sum(doc_len), 0)) AS total_doc_len FROM {} FINAL WHERE doc_len > 0 FORMAT JSONEachRow",
   845             self.table_ref("search_documents")
   846         );
   847         let fallback: Vec<CorpusStatsRow> =
   848             self.map_backend(self.ch.query_rows(&fallback_query, None).await)?;
   849         let resolved = if let Some(row) = fallback.first() {
   850             (row.docs, row.total_doc_len)
   851         } else {
   852             (0, 0)
   853         };
   854  
   855         let mut cache = self.stats_cache.write().await;
   856         cache.corpus_stats = Some(CorpusStatsCacheEntry {
   857             docs: resolved.0,
   858             total_doc_len: resolved.1,
   859             fetched_at: now,
   860         });
   861         Ok(resolved)
   862     }
   863  
   864     async fn cache_corpus_stats(&self, docs: u64, total_doc_len: u64, fetched_at: Instant) {
   865         let mut cache = self.stats_cache.write().await;
   866         cache.corpus_stats = Some(CorpusStatsCacheEntry {
   867             docs,
   868             total_doc_len,
   869             fetched_at,
   870         });
   871     }
   872  
   873     async fn cache_term_df_values(
   874         &self,
   875         terms: impl IntoIterator<Item = String>,
   876         map: &HashMap<String, u64>,
   877         fetched_at: Instant,
   878     ) {
   879         let mut cache = self.stats_cache.write().await;
   880         for term in terms {
   881             let df = *map.get(&term).unwrap_or(&0);
   882             cache
   883                 .term_df_by_term
   884                 .insert(term, TermDfCacheEntry { df, fetched_at });
   885         }
   886     }
   887  
   888     async fn load_hot_queries_for_prewarm(&self, limit: usize) -> RepoResult<Vec<String>> {
   889         let query = format!(
   890             "SELECT raw_query
   891 FROM (
   892   SELECT
   893     raw_query,
   894     count() AS query_count,
   895     avg(response_ms) AS avg_response_ms
   896   FROM {}
   897   WHERE source = 'moraine-mcp'
   898     AND ts >= now() - INTERVAL 7 DAY
   899     AND lengthUTF8(trim(BOTH ' ' FROM raw_query)) > 0
   900   GROUP BY raw_query
   901   ORDER BY query_count DESC, avg_response_ms DESC
   902   LIMIT {}
   903 )
   904 FORMAT JSONEachRow",
   905             self.table_ref("search_query_log"),
   906             limit
   907         );
   908         let rows: Vec<HotQueryRow> = self.map_backend(self.ch.query_rows(&query, None).await)?;
   909         Ok(rows.into_iter().map(|row| row.raw_query).collect())
   910     }
   911  
   912     async fn df_map(&self, terms: &[String]) -> RepoResult<HashMap<String, u64>> {
   913         let now = Instant::now();
   914         let postings_table = self.table_ref("search_postings");
   915  
   916         let mut map = HashMap::<String, u64>::new();
   917         let mut missing_terms = Vec::<String>::new();
   918  
   919         {
   920             let cache = self.stats_cache.read().await;
   921             for term in terms {
   922                 if let Some(entry) = cache.term_df_by_term.get(term) {
   923                     if now.duration_since(entry.fetched_at) <= TERM_DF_CACHE_TTL {
   924                         map.insert(term.clone(), entry.df);
   925                         continue;
   926                     }
   927                 }
   928                 missing_terms.push(term.clone());
   929             }
   930         }
   931  
   932         if missing_terms.is_empty() {
   933             return Ok(map);
   934         }
   935  
   936         let missing_terms_array = sql_array_strings(&missing_terms);
   937         let df_query = format!(
   938             "SELECT term, toUInt64(uniqExact(doc_id)) AS df FROM {postings_table} WHERE term IN {missing_terms_array} GROUP BY term FORMAT JSONEachRow",
   939         );
   940         let rows: Vec<DfRow> = self.map_backend(self.ch.query_rows(&df_query, None).await)?;
   941         for row in rows {
   942             map.insert(row.term, row.df);
   943         }
   944  
   945         for term in &missing_terms {
   946             map.entry(term.clone()).or_insert(0);
   947         }
   948  
   949         self.cache_term_df_values(missing_terms, &map, now).await;
   950         Ok(map)
   951     }
   952  
   953     fn build_search_events_sql(
   954         &self,
   955         terms: &[String],
   956         idf_by_term: &HashMap<String, f64>,
   957         avgdl: f64,
   958         include_tool_events: bool,
   959         event_kinds: Option<&[SearchEventKind]>,
   960         exclude_codex_mcp: bool,
   961         use_document_codex_flag: bool,
   962         session_id: Option<&str>,
   963         min_should_match: u16,
   964         min_score: f64,
   965         limit: u16,
   966     ) -> RepoResult<String> {
   967         if terms.is_empty() {
   968             return Err(RepoError::invalid_argument(
   969                 "cannot build search query with empty terms",
   970             ));
   971         }
   972  
   973         let postings_table = self.table_ref("search_postings");
   974         let documents_table = self.table_ref("search_documents");
   975         let terms_array_sql = sql_array_strings(terms);
   976         let idf_vals: Vec<f64> = terms
   977             .iter()
   978             .map(|t| *idf_by_term.get(t).unwrap_or(&0.0))
   979             .collect();
   980         let idf_array_sql = sql_array_f64(&idf_vals);
   981         let documents_join_sql = if use_document_codex_flag {
   982             format!(
   983                 "(SELECT
   984   event_uid,
   985   any(session_id) AS session_id,
   986   any(source_name) AS source_name,
   987   any(provider) AS provider,
   988   any(event_class) AS event_class,
   989   any(payload_type) AS payload_type,
   990   any(actor_role) AS actor_role,
   991   any(name) AS name,
   992   any(phase) AS phase,
   993   any(source_ref) AS source_ref,
   994   any(doc_len) AS doc_len,
   995   any(text_content) AS text_content,
   996   any(payload_json) AS payload_json,
   997   toUInt8(any(has_codex_mcp)) AS has_codex_mcp
   998 FROM {documents_table}
   999 GROUP BY event_uid)"
  1000             )
  1001         } else {
  1002             format!(
  1003                 "(SELECT
  1004   event_uid,
  1005   any(session_id) AS session_id,
  1006   any(source_name) AS source_name,
  1007   any(provider) AS provider,
  1008   any(event_class) AS event_class,
  1009   any(payload_type) AS payload_type,
  1010   any(actor_role) AS actor_role,
  1011   any(name) AS name,
  1012   any(phase) AS phase,
  1013   any(source_ref) AS source_ref,
  1014   any(doc_len) AS doc_len,
  1015   any(text_content) AS text_content,
  1016   any(payload_json) AS payload_json,
  1017   toUInt8(0) AS has_codex_mcp
  1018 FROM {documents_table}
  1019 GROUP BY event_uid)"
  1020             )
  1021         };
  1022  
  1023         let mut where_clauses = vec![format!("p.term IN {}", terms_array_sql)];
  1024  
  1025         if let Some(sid) = session_id {
  1026             where_clauses.push(format!("d.session_id = {}", sql_quote(sid)));
  1027         }
  1028  
  1029         if let Some(event_kinds) = event_kinds {
  1030             where_clauses.push(Self::event_kind_filter_clause(
  1031                 "d.event_class",
  1032                 "d.payload_type",
  1033                 event_kinds,
  1034             ));
  1035         } else if include_tool_events {
  1036             where_clauses.push("d.payload_type != 'token_count'".to_string());
  1037         } else {
  1038             where_clauses
  1039                 .push("d.event_class IN ('message', 'reasoning', 'event_msg')".to_string());
  1040             where_clauses.push(
  1041                 "d.payload_type NOT IN ('token_count', 'task_started', 'task_complete', 'turn_aborted', 'item_completed')"
  1042                     .to_string(),
  1043             );
  1044         }
  1045  
  1046         if exclude_codex_mcp {
  1047             if use_document_codex_flag {
  1048                 where_clauses.push("toUInt8(d.has_codex_mcp) = 0".to_string());
  1049             } else {
  1050                 where_clauses.push(
  1051                     "positionCaseInsensitiveUTF8(d.payload_json, 'codex-mcp') = 0".to_string(),
  1052                 );
  1053             }
  1054             where_clauses.push("lowerUTF8(d.name) NOT IN ('search', 'open')".to_string());
  1055         }
  1056  
  1057         let where_sql = where_clauses.join("\n  AND ");
  1058         let k1 = self.cfg.bm25_k1.max(0.01);
  1059         let b = self.cfg.bm25_b.clamp(0.0, 1.0);
  1060  
  1061         Ok(format!(
  1062             "WITH
  1063   {k1:.6} AS k1,
  1064   {b:.6} AS b,
  1065   greatest({avgdl:.6}, 1.0) AS avgdl,
  1066   {terms_array_sql} AS q_terms,
  1067   {idf_array_sql} AS q_idf
  1068 SELECT
  1069   p.doc_id AS event_uid,
  1070   any(d.session_id) AS session_id,
  1071   any(d.source_name) AS source_name,
  1072   any(d.provider) AS provider,
  1073   any(d.event_class) AS event_class,
  1074   any(d.payload_type) AS payload_type,
  1075   any(d.actor_role) AS actor_role,
  1076   any(d.name) AS name,
  1077   any(d.phase) AS phase,
  1078   any(d.source_ref) AS source_ref,
  1079   any(d.doc_len) AS doc_len,
  1080   leftUTF8(any(d.text_content), {preview}) AS text_preview,
  1081   sum(
  1082     transform(toString(p.term), q_terms, q_idf, 0.0)
  1083     *
  1084     (
  1085       (toFloat64(p.tf) * (k1 + 1.0))
  1086       /
  1087       (toFloat64(p.tf) + k1 * (1.0 - b + b * (toFloat64(p.doc_len) / avgdl)))
  1088     )
  1089   ) AS score,
  1090   uniqExact(p.term) AS matched_terms
  1091 FROM {postings_table} AS p
  1092 INNER JOIN {documents_join_sql} AS d ON d.event_uid = p.doc_id
  1093 WHERE {where_sql}
  1094 GROUP BY p.doc_id
  1095 HAVING matched_terms >= {min_should_match} AND score >= {min_score:.6}
  1096 ORDER BY score DESC, event_uid ASC
  1097 LIMIT {limit}
  1098 FORMAT JSONEachRow",
  1099             preview = self.cfg.preview_chars,
  1100             postings_table = postings_table,
  1101             documents_join_sql = documents_join_sql,
  1102         ))
  1103     }
  1104  
  1105     fn build_search_events_hydrate_sql(
  1106         &self,
  1107         event_uids: &[String],
  1108         use_document_codex_flag: bool,
  1109     ) -> RepoResult<String> {
  1110         if event_uids.is_empty() {
  1111             return Err(RepoError::invalid_argument(
  1112                 "cannot hydrate search rows for empty event_uids",
  1113             ));
  1114         }
  1115         let documents_table = self.table_ref("search_documents");
  1116         let event_uids_array = sql_array_strings(event_uids);
  1117         let codex_expr = if use_document_codex_flag {
  1118             "d.has_codex_mcp"
  1119         } else {
  1120             "toUInt8(positionCaseInsensitiveUTF8(d.payload_json, 'codex-mcp') > 0)"
  1121         };
  1122         let documents_source_sql = if use_document_codex_flag {
  1123             format!(
  1124                 "(SELECT
  1125   event_uid,
  1126   any(session_id) AS session_id,
  1127   any(source_name) AS source_name,
  1128   any(provider) AS provider,
  1129   any(event_class) AS event_class,
  1130   any(payload_type) AS payload_type,
  1131   any(actor_role) AS actor_role,
  1132   any(name) AS name,
  1133   any(phase) AS phase,
  1134   any(source_ref) AS source_ref,
  1135   any(doc_len) AS doc_len,
  1136   any(text_content) AS text_content,
  1137   any(payload_json) AS payload_json,
  1138   toUInt8(any(has_codex_mcp)) AS has_codex_mcp
  1139 FROM {documents_table}
  1140 WHERE event_uid IN {event_uids_array}
  1141 GROUP BY event_uid)"
  1142             )
  1143         } else {
  1144             format!(
  1145                 "(SELECT
  1146   event_uid,
  1147   any(session_id) AS session_id,
  1148   any(source_name) AS source_name,
  1149   any(provider) AS provider,
  1150   any(event_class) AS event_class,
  1151   any(payload_type) AS payload_type,
  1152   any(actor_role) AS actor_role,
  1153   any(name) AS name,
  1154   any(phase) AS phase,
  1155   any(source_ref) AS source_ref,
  1156   any(doc_len) AS doc_len,
  1157   any(text_content) AS text_content,
  1158   any(payload_json) AS payload_json,
  1159   toUInt8(0) AS has_codex_mcp
  1160 FROM {documents_table}
  1161 WHERE event_uid IN {event_uids_array}
  1162 GROUP BY event_uid)"
  1163             )
  1164         };
  1165  
  1166         Ok(format!(
  1167             "SELECT
  1168   d.event_uid AS event_uid,
  1169   d.session_id AS session_id,
  1170   d.source_name AS source_name,
  1171   d.provider AS provider,
  1172   d.event_class AS event_class,
  1173   d.payload_type AS payload_type,
  1174   d.actor_role AS actor_role,
  1175   d.name AS name,
  1176   d.phase AS phase,
  1177   d.source_ref AS source_ref,
  1178   d.doc_len AS doc_len,
  1179   leftUTF8(d.text_content, {preview}) AS text_preview,
  1180   {codex_expr} AS has_codex_mcp
  1181 FROM {documents_source_sql} AS d
  1182 FORMAT JSONEachRow",
  1183             preview = self.cfg.preview_chars,
  1184             codex_expr = codex_expr,
  1185             documents_source_sql = documents_source_sql,
  1186         ))
  1187     }
  1188  
  1189     async fn search_documents_has_codex_flag(&self) -> RepoResult<bool> {
  1190         let now = Instant::now();
  1191         {
  1192             let cache = self.stats_cache.read().await;
  1193             if let Some((value, fetched_at)) = cache.has_codex_flag_column {
  1194                 if now.duration_since(fetched_at) <= SEARCH_SCHEMA_CACHE_TTL {
  1195                     return Ok(value);
  1196                 }
  1197             }
  1198         }
  1199  
  1200         let query = format!(
  1201             "SELECT
  1202   toUInt8(count() > 0) AS exists
  1203 FROM system.columns
  1204 WHERE database = {}
  1205   AND table = 'search_documents'
  1206   AND name = 'has_codex_mcp'
  1207 FORMAT JSONEachRow",
  1208             sql_quote(&self.ch.config().database)
  1209         );
  1210         let rows: Vec<ColumnExistsRow> =
  1211             self.map_backend(self.ch.query_rows(&query, None).await)?;
  1212         let exists = rows.first().map(|row| row.exists != 0).unwrap_or(false);
  1213  
  1214         let mut cache = self.stats_cache.write().await;
  1215         cache.has_codex_flag_column = Some((exists, now));
  1216         Ok(exists)
  1217     }
  1218  
  1219     fn passes_search_doc_filters(
  1220         row: &SearchDocExtraCacheEntry,
  1221         include_tool_events: bool,
  1222         event_kinds: Option<&[SearchEventKind]>,
  1223         exclude_codex_mcp: bool,
  1224         session_id: Option<&str>,
  1225     ) -> bool {
  1226         if let Some(sid) = session_id {
  1227             if row.session_id != sid {
  1228                 return false;
  1229             }
  1230         }
  1231  
  1232         if let Some(event_kinds) = event_kinds {
  1233             if !Self::matches_requested_event_kinds(
  1234                 &row.event_class,
  1235                 &row.payload_type,
  1236                 event_kinds,
  1237             ) {
  1238                 return false;
  1239             }
  1240         } else if include_tool_events {
  1241             if row.payload_type == "token_count" {
  1242                 return false;
  1243             }
  1244         } else {
  1245             if row.event_class != "message"
  1246                 && row.event_class != "reasoning"
  1247                 && row.event_class != "event_msg"
  1248             {
  1249                 return false;
  1250             }
  1251             if row.payload_type == "token_count"
  1252                 || row.payload_type == "task_started"
  1253                 || row.payload_type == "task_complete"
  1254                 || row.payload_type == "turn_aborted"
  1255                 || row.payload_type == "item_completed"
  1256             {
  1257                 return false;
  1258             }
  1259         }
  1260  
  1261         if exclude_codex_mcp {
  1262             if row.has_codex_mcp != 0 {
  1263                 return false;
  1264             }
  1265             if row.name.eq_ignore_ascii_case("search") || row.name.eq_ignore_ascii_case("open") {
  1266                 return false;
  1267             }
  1268         }
  1269  
  1270         true
  1271     }
  1272  
  1273     fn bm25_term_score(tf: u16, doc_len: u32, avgdl: f64, k1: f64, b: f64) -> f64 {
  1274         let tf = tf as f64;
  1275         let norm = tf + k1 * (1.0 - b + b * (doc_len as f64 / avgdl.max(1.0)));
  1276         if norm <= 0.0 {
  1277             0.0
  1278         } else {
  1279             tf * (k1 + 1.0) / norm
  1280         }
  1281     }
  1282  
  1283     fn bm25_idf(docs: u64, df: u64) -> f64 {
  1284         let idf = if df == 0 {
  1285             (1.0 + ((docs as f64 + 0.5) / 0.5)).ln()
  1286         } else {
  1287             let n = docs.max(df) as f64;
  1288             (1.0 + ((n - df as f64 + 0.5) / (df as f64 + 0.5))).ln()
  1289         };
  1290         idf.max(0.0)
  1291     }
  1292  
  1293     async fn load_term_postings_for_terms(
  1294         &self,
  1295         terms: &[String],
  1296     ) -> RepoResult<HashMap<String, Arc<[CachedPostingRow]>>> {
  1297         let now = Instant::now();
  1298         let mut by_term = HashMap::<String, Arc<[CachedPostingRow]>>::new();
  1299         let mut missing_terms = Vec::<String>::new();
  1300  
  1301         {
  1302             let cache = self.term_postings_cache.read().await;
  1303             for term in terms {
  1304                 if let Some(entry) = cache.get(term) {
  1305                     if now.duration_since(entry.fetched_at) <= TERM_POSTINGS_CACHE_TTL {
  1306                         by_term.insert(term.clone(), Arc::clone(&entry.rows));
  1307                         continue;
  1308                     }
  1309                 }
  1310                 missing_terms.push(term.clone());
  1311             }
  1312         }
  1313  
  1314         if !missing_terms.is_empty() {
  1315             let postings_table = self.table_ref("search_postings");
  1316             let terms_array = sql_array_strings(&missing_terms);
  1317             let query = format!(
  1318                 "SELECT
  1319   term,
  1320   doc_id AS event_uid,
  1321   doc_len,
  1322   tf
  1323 FROM {postings_table}
  1324 WHERE term IN {terms_array}
  1325 FORMAT JSONEachRow",
  1326             );
  1327  
  1328             let fetched_rows: Vec<FetchedPostingRow> =
  1329                 self.map_backend(self.ch.query_rows(&query, None).await)?;
  1330             let mut grouped = HashMap::<String, Vec<CachedPostingRow>>::new();
  1331             for row in fetched_rows {
  1332                 grouped.entry(row.term).or_default().push(CachedPostingRow {
  1333                     event_uid: row.event_uid,
  1334                     doc_len: row.doc_len,
  1335                     tf: row.tf,
  1336                 });
  1337             }
  1338  
  1339             let mut cache = self.term_postings_cache.write().await;
  1340  
  1341             for term in missing_terms {
  1342                 let rows_vec = grouped.remove(&term).unwrap_or_default();
  1343                 let rows: Arc<[CachedPostingRow]> = Arc::from(rows_vec.into_boxed_slice());
  1344                 by_term.insert(term.clone(), Arc::clone(&rows));
  1345                 if rows.len() <= TERM_POSTINGS_CACHE_MAX_ROWS_PER_TERM {
  1346                     cache.insert(
  1347                         term,
  1348                         TermPostingsCacheEntry {
  1349                             rows,
  1350                             fetched_at: now,
  1351                         },
  1352                     );
  1353                 }
  1354             }
  1355  
  1356             let mut cached_rows_total = cache.values().map(|entry| entry.rows.len()).sum::<usize>();
  1357             while cache.len() > TERM_POSTINGS_CACHE_MAX_ENTRIES
  1358                 || cached_rows_total > TERM_POSTINGS_CACHE_MAX_ROWS_TOTAL
  1359             {
  1360                 if let Some(oldest_key) = cache
  1361                     .iter()
  1362                     .min_by_key(|(_, entry)| entry.fetched_at)
  1363                     .map(|(k, _)| k.clone())
  1364                 {
  1365                     if let Some(evicted) = cache.remove(&oldest_key) {
  1366                         cached_rows_total = cached_rows_total.saturating_sub(evicted.rows.len());
  1367                     }
  1368                 } else {
  1369                     break;
  1370                 }
  1371             }
  1372         }
  1373  
  1374         for term in terms {
  1375             by_term
  1376                 .entry(term.clone())
  1377                 .or_insert_with(|| Arc::from(Vec::<CachedPostingRow>::new().into_boxed_slice()));
  1378         }
  1379         Ok(by_term)
  1380     }
  1381  
  1382     async fn load_search_doc_extras(
  1383         &self,
  1384         event_uids: &[String],
  1385         use_document_codex_flag: bool,
  1386     ) -> RepoResult<HashMap<String, SearchDocExtraCacheEntry>> {
  1387         let now = Instant::now();
  1388         let mut by_uid = HashMap::<String, SearchDocExtraCacheEntry>::new();
  1389         let mut missing_uids = Vec::<String>::new();
  1390  
  1391         {
  1392             let cache = self.search_doc_extra_cache.read().await;
  1393             for uid in event_uids {
  1394                 if let Some(entry) = cache.get(uid) {
  1395                     if now.duration_since(entry.fetched_at) <= SEARCH_DOC_EXTRA_CACHE_TTL {
  1396                         by_uid.insert(uid.clone(), entry.clone());
  1397                         continue;
  1398                     }
  1399                 }
  1400                 missing_uids.push(uid.clone());
  1401             }
  1402         }
  1403  
  1404         if !missing_uids.is_empty() {
  1405             let query =
  1406                 self.build_search_events_hydrate_sql(&missing_uids, use_document_codex_flag)?;
  1407             let fetched_rows: Vec<SearchDocExtraRow> =
  1408                 self.map_backend(self.ch.query_rows(&query, None).await)?;
  1409  
  1410             let mut cache = self.search_doc_extra_cache.write().await;
  1411  
  1412             for row in fetched_rows {
  1413                 let entry = SearchDocExtraCacheEntry {
  1414                     session_id: row.session_id,
  1415                     source_name: row.source_name,
  1416                     provider: row.provider,
  1417                     event_class: row.event_class,
  1418                     payload_type: row.payload_type,
  1419                     actor_role: row.actor_role,
  1420                     name: row.name,
  1421                     phase: row.phase,
  1422                     source_ref: row.source_ref,
  1423                     doc_len: row.doc_len,
  1424                     text_preview: row.text_preview,
  1425                     has_codex_mcp: row.has_codex_mcp,
  1426                     fetched_at: now,
  1427                 };
  1428                 by_uid.insert(row.event_uid.clone(), entry.clone());
  1429                 cache.insert(row.event_uid, entry);
  1430             }
  1431  
  1432             while cache.len() > SEARCH_DOC_EXTRA_CACHE_MAX_ENTRIES {
  1433                 if let Some(oldest_key) = cache
  1434                     .iter()
  1435                     .min_by_key(|(_, entry)| entry.fetched_at)
  1436                     .map(|(k, _)| k.clone())
  1437                 {
  1438                     cache.remove(&oldest_key);
  1439                 } else {
  1440                     break;
  1441                 }
  1442             }
  1443         }
  1444  
  1445         Ok(by_uid)
  1446     }
  1447  
  1448     async fn search_events_rows(
  1449         &self,
  1450         terms: &[String],
  1451         docs: u64,
  1452         avgdl: f64,
  1453         include_tool_events: bool,
  1454         event_kinds: Option<&[SearchEventKind]>,
  1455         exclude_codex_mcp: bool,
  1456         session_id: Option<&str>,
  1457         min_should_match: u16,
  1458         min_score: f64,
  1459         limit: u16,
  1460     ) -> RepoResult<Vec<SearchRow>> {
  1461         let (rows, _candidate_count) = self
  1462             .search_events_rows_fast_pass(
  1463                 terms,
  1464                 docs,
  1465                 avgdl,
  1466                 include_tool_events,
  1467                 event_kinds,
  1468                 exclude_codex_mcp,
  1469                 session_id,
  1470                 min_should_match,
  1471                 min_score,
  1472                 limit,
  1473                 usize::MAX,
  1474             )
  1475             .await?;
  1476         if !rows.is_empty() {
  1477             return Ok(rows);
  1478         }
  1479  
  1480         self.search_events_rows_exact_sql(
  1481             terms,
  1482             docs,
  1483             avgdl,
  1484             include_tool_events,
  1485             event_kinds,
  1486             exclude_codex_mcp,
  1487             session_id,
  1488             min_should_match,
  1489             min_score,
  1490             limit,
  1491         )
  1492         .await
  1493     }
  1494  
  1495     async fn search_events_rows_exact_sql(
  1496         &self,
  1497         terms: &[String],
  1498         docs: u64,
  1499         avgdl: f64,
  1500         include_tool_events: bool,
  1501         event_kinds: Option<&[SearchEventKind]>,
  1502         exclude_codex_mcp: bool,
  1503         session_id: Option<&str>,
  1504         min_should_match: u16,
  1505         min_score: f64,
  1506         limit: u16,
  1507     ) -> RepoResult<Vec<SearchRow>> {
  1508         let df_map = self.df_map(terms).await?;
  1509         let mut idf_by_term = HashMap::<String, f64>::new();
  1510         for term in terms {
  1511             let df = *df_map.get(term).unwrap_or(&0);
  1512             idf_by_term.insert(term.clone(), Self::bm25_idf(docs, df));
  1513         }
  1514  
  1515         let use_document_codex_flag = self.search_documents_has_codex_flag().await?;
  1516         let fallback_sql = self.build_search_events_sql(
  1517             terms,
  1518             &idf_by_term,
  1519             avgdl,
  1520             include_tool_events,
  1521             event_kinds,
  1522             exclude_codex_mcp,
  1523             use_document_codex_flag,
  1524             session_id,
  1525             min_should_match,
  1526             min_score,
  1527             limit,
  1528         )?;
  1529  
  1530         let mut fallback_rows: Vec<SearchRow> =
  1531             self.map_backend(self.ch.query_rows(&fallback_sql, None).await)?;
  1532         fallback_rows.sort_by(|a, b| {
  1533             b.score
  1534                 .total_cmp(&a.score)
  1535                 .then_with(|| a.event_uid.cmp(&b.event_uid))
  1536         });
  1537         Ok(fallback_rows)
  1538     }
  1539  
  1540     async fn search_events_rows_by_strategy(
  1541         &self,
  1542         strategy: SearchEventsStrategy,
  1543         terms: &[String],
  1544         docs: u64,
  1545         avgdl: f64,
  1546         include_tool_events: bool,
  1547         event_kinds: Option<&[SearchEventKind]>,
  1548         exclude_codex_mcp: bool,
  1549         session_id: Option<&str>,
  1550         min_should_match: u16,
  1551         min_score: f64,
  1552         limit: u16,
  1553     ) -> RepoResult<Vec<SearchRow>> {
  1554         match strategy {
  1555             SearchEventsStrategy::Optimized => {
  1556                 self.search_events_rows(
  1557                     terms,
  1558                     docs,
  1559                     avgdl,
  1560                     include_tool_events,
  1561                     event_kinds,
  1562                     exclude_codex_mcp,
  1563                     session_id,
  1564                     min_should_match,
  1565                     min_score,
  1566                     limit,
  1567                 )
  1568                 .await
  1569             }
  1570             SearchEventsStrategy::OracleExact => {
  1571                 self.search_events_rows_exact_sql(
  1572                     terms,
  1573                     docs,
  1574                     avgdl,
  1575                     include_tool_events,
  1576                     event_kinds,
  1577                     exclude_codex_mcp,
  1578                     session_id,
  1579                     min_should_match,
  1580                     min_score,
  1581                     limit,
  1582                 )
  1583                 .await
  1584             }
  1585         }
  1586     }
  1587  
  1588     fn dedupe_fetch_limit(limit: u16) -> u16 {
  1589         limit.saturating_mul(3).max(limit)
  1590     }
  1591  
  1592     fn is_message_search_row(row: &SearchRow) -> bool {
  1593         row.event_class == "message" && row.payload_type == "message"
  1594     }
  1595  
  1596     fn is_event_msg_search_row(row: &SearchRow) -> bool {
  1597         row.event_class == "event_msg"
  1598             && (row.payload_type == "agent_message"
  1599                 || row.payload_type == "user_message"
  1600                 || row.payload_type == "event_msg")
  1601     }
  1602  
  1603     fn is_reasoning_search_row(row: &SearchRow) -> bool {
  1604         row.event_class == "reasoning"
  1605     }
  1606  
  1607     fn is_event_msg_reasoning_search_row(row: &SearchRow) -> bool {
  1608         row.event_class == "event_msg" && row.payload_type == "agent_reasoning"
  1609     }
  1610  
  1611     fn compact_preview_for_dedup(text: &str) -> String {
  1612         text.split_whitespace().collect::<Vec<_>>().join(" ")
  1613     }
  1614  
  1615     fn search_rows_are_mirrors(a: &SearchRow, b: &SearchRow) -> bool {
  1616         let is_message_pair = (Self::is_message_search_row(a) && Self::is_event_msg_search_row(b))
  1617             || (Self::is_event_msg_search_row(a) && Self::is_message_search_row(b));
  1618         let is_reasoning_pair = (Self::is_reasoning_search_row(a)
  1619             && Self::is_event_msg_reasoning_search_row(b))
  1620             || (Self::is_event_msg_reasoning_search_row(a) && Self::is_reasoning_search_row(b));
  1621         let same_kind_pair = is_message_pair || is_reasoning_pair;
  1622         if !same_kind_pair {
  1623             return false;
  1624         }
  1625  
  1626         if a.session_id != b.session_id
  1627             || a.actor_role != b.actor_role
  1628             || a.matched_terms != b.matched_terms
  1629         {
  1630             return false;
  1631         }
  1632  
  1633         if (a.score - b.score).abs() > 1e-9 {
  1634             return false;
  1635         }
  1636  
  1637         Self::compact_preview_for_dedup(&a.text_preview)
  1638             == Self::compact_preview_for_dedup(&b.text_preview)
  1639     }
  1640  
  1641     fn search_row_kind_priority(row: &SearchRow) -> u8 {
  1642         if Self::is_message_search_row(row) || Self::is_reasoning_search_row(row) {
  1643             0
  1644         } else if Self::is_event_msg_search_row(row) || Self::is_event_msg_reasoning_search_row(row)
  1645         {
  1646             1
  1647         } else {
  1648             2
  1649         }
  1650     }
  1651  
  1652     fn should_replace_mirror(existing: &SearchRow, candidate: &SearchRow) -> bool {
  1653         let existing_priority = Self::search_row_kind_priority(existing);
  1654         let candidate_priority = Self::search_row_kind_priority(candidate);
  1655         candidate_priority < existing_priority
  1656             || (candidate_priority == existing_priority && candidate.event_uid < existing.event_uid)
  1657     }
  1658  
  1659     fn dedupe_search_rows(rows: Vec<SearchRow>, limit: u16) -> Vec<SearchRow> {
  1660         let target = limit as usize;
  1661         let mut deduped = Vec::<SearchRow>::with_capacity(rows.len().min(target));
  1662  
  1663         for row in rows {
  1664             if let Some(existing_idx) = deduped
  1665                 .iter()
  1666                 .position(|existing| Self::search_rows_are_mirrors(existing, &row))
  1667             {
  1668                 if Self::should_replace_mirror(&deduped[existing_idx], &row) {
  1669                     deduped[existing_idx] = row;
  1670                 }
  1671                 continue;
  1672             }
  1673  
  1674             deduped.push(row);
  1675             if deduped.len() >= target {
  1676                 break;
  1677             }
  1678         }
  1679  
  1680         deduped
  1681     }
  1682  
  1683     async fn search_events_rows_fast_pass(
  1684         &self,
  1685         terms: &[String],
  1686         docs: u64,
  1687         avgdl: f64,
  1688         include_tool_events: bool,
  1689         event_kinds: Option<&[SearchEventKind]>,
  1690         exclude_codex_mcp: bool,
  1691         session_id: Option<&str>,
  1692         min_should_match: u16,
  1693         min_score: f64,
  1694         limit: u16,
  1695         candidate_limit: usize,
  1696     ) -> RepoResult<(Vec<SearchRow>, usize)> {
  1697         #[derive(Clone, Copy)]
  1698         struct CandidateRef<'a> {
  1699             row: &'a CachedPostingRow,
  1700             score: f64,
  1701             matched_terms: u64,
  1702         }
  1703  
  1704         let postings_by_term = self.load_term_postings_for_terms(terms).await?;
  1705         let use_document_codex_flag = self.search_documents_has_codex_flag().await?;
  1706         let df_map = self.df_map(terms).await?;
  1707         let k1 = self.cfg.bm25_k1.max(0.01);
  1708         let b = self.cfg.bm25_b.clamp(0.0, 1.0);
  1709         let mut idf_by_term = HashMap::<&str, f64>::new();
  1710         for term in terms {
  1711             let df = *df_map.get(term).unwrap_or(&0);
  1712             idf_by_term.insert(term.as_str(), Self::bm25_idf(docs, df));
  1713         }
  1714  
  1715         let mut accum_by_uid = HashMap::<&str, SearchScoreAccum<'_>>::new();
  1716         for (idx, term) in terms.iter().enumerate() {
  1717             if idx >= 64 {
  1718                 break;
  1719             }
  1720             let idf = *idf_by_term.get(term.as_str()).unwrap_or(&0.0);
  1721             if idf <= 0.0 {
  1722                 continue;
  1723             }
  1724  
  1725             if let Some(rows) = postings_by_term.get(term) {
  1726                 for row in rows.iter() {
  1727                     let entry = accum_by_uid
  1728                         .entry(row.event_uid.as_str())
  1729                         .or_insert_with(|| SearchScoreAccum {
  1730                             row,
  1731                             score: 0.0,
  1732                             matched_mask: 0,
  1733                         });
  1734  
  1735                     entry.score += idf * Self::bm25_term_score(row.tf, row.doc_len, avgdl, k1, b);
  1736                     entry.matched_mask |= 1u64 << idx;
  1737                 }
  1738             }
  1739         }
  1740  
  1741         let mut fast_candidates = Vec::<CandidateRef<'_>>::new();
  1742         for acc in accum_by_uid.values() {
  1743             let matched_terms = acc.matched_mask.count_ones() as u64;
  1744             if matched_terms < min_should_match as u64 || acc.score < min_score {
  1745                 continue;
  1746             }
  1747             fast_candidates.push(CandidateRef {
  1748                 row: acc.row,
  1749                 score: acc.score,
  1750                 matched_terms,
  1751             });
  1752         }
  1753  
  1754         if fast_candidates.is_empty() {
  1755             return Ok((Vec::new(), 0));
  1756         }
  1757  
  1758         let candidate_count = fast_candidates.len();
  1759         if candidate_limit < fast_candidates.len() {
  1760             fast_candidates.select_nth_unstable_by(candidate_limit, |a, b| {
  1761                 b.score
  1762                     .total_cmp(&a.score)
  1763                     .then_with(|| a.row.event_uid.cmp(&b.row.event_uid))
  1764             });
  1765             fast_candidates.truncate(candidate_limit);
  1766         }
  1767         fast_candidates.sort_by(|a, b| {
  1768             b.score
  1769                 .total_cmp(&a.score)
  1770                 .then_with(|| a.row.event_uid.cmp(&b.row.event_uid))
  1771         });
  1772  
  1773         let mut fast_rows = Vec::<SearchRow>::new();
  1774         let hydrate_chunk_size = (limit as usize).saturating_mul(8).max(128);
  1775         let mut offset = 0usize;
  1776         while offset < fast_candidates.len() && fast_rows.len() < limit as usize {
  1777             let end = (offset + hydrate_chunk_size).min(fast_candidates.len());
  1778             let event_uids: Vec<String> = fast_candidates[offset..end]
  1779                 .iter()
  1780                 .map(|row| row.row.event_uid.clone())
  1781                 .collect();
  1782             let doc_extras = self
  1783                 .load_search_doc_extras(&event_uids, use_document_codex_flag)
  1784                 .await?;
  1785  
  1786             for row in &fast_candidates[offset..end] {
  1787                 let Some(extra) = doc_extras.get(row.row.event_uid.as_str()) else {
  1788                     continue;
  1789                 };
  1790                 if !Self::passes_search_doc_filters(
  1791                     extra,
  1792                     include_tool_events,
  1793                     event_kinds,
  1794                     exclude_codex_mcp,
  1795                     session_id,
  1796                 ) {
  1797                     continue;
  1798                 }
  1799  
  1800                 fast_rows.push(SearchRow {
  1801                     event_uid: row.row.event_uid.clone(),
  1802                     session_id: extra.session_id.clone(),
  1803                     source_name: extra.source_name.clone(),
  1804                     provider: extra.provider.clone(),
  1805                     event_class: extra.event_class.clone(),
  1806                     payload_type: extra.payload_type.clone(),
  1807                     actor_role: extra.actor_role.clone(),
  1808                     name: extra.name.clone(),
  1809                     phase: extra.phase.clone(),
  1810                     source_ref: extra.source_ref.clone(),
  1811                     doc_len: extra.doc_len,
  1812                     text_preview: extra.text_preview.clone(),
  1813                     score: row.score,
  1814                     matched_terms: row.matched_terms,
  1815                 });
  1816  
  1817                 if fast_rows.len() >= limit as usize {
  1818                     break;
  1819                 }
  1820             }
  1821             offset = end;
  1822         }
  1823  
  1824         Ok((fast_rows, candidate_count))
  1825     }
  1826  
  1827     fn conversation_candidate_limit(limit: u16) -> usize {
  1828         (limit as usize)
  1829             .saturating_mul(CONVERSATION_CANDIDATE_MULTIPLIER)
  1830             .clamp(CONVERSATION_CANDIDATE_MIN, CONVERSATION_CANDIDATE_MAX)
  1831     }
  1832  
  1833     fn now_unix_ms() -> i64 {
  1834         SystemTime::now()
  1835             .duration_since(UNIX_EPOCH)
  1836             .map(|d| d.as_millis() as i64)
  1837             .unwrap_or_default()
  1838     }
  1839  
  1840     fn build_conversation_postings_filter_sql(
  1841         &self,
  1842         terms: &[String],
  1843         include_tool_events: bool,
  1844         exclude_codex_mcp: bool,
  1845         from_unix_ms: Option<i64>,
  1846         to_unix_ms: Option<i64>,
  1847         recent_from_unix_ms: Option<i64>,
  1848         candidate_session_ids: Option<&[String]>,
  1849     ) -> (String, String, String) {
  1850         let terms_array_sql = sql_array_strings(terms);
  1851         let mut postings_filters = vec![format!("p.term IN {}", terms_array_sql)];
  1852         let mut document_filters = Vec::new();
  1853  
  1854         if let Some(from_unix_ms) = from_unix_ms {
  1855             document_filters.push(format!(
  1856                 "toUnixTimestamp64Milli(d.ingested_at) >= {from_unix_ms}"
  1857             ));
  1858         }
  1859         if let Some(to_unix_ms) = to_unix_ms {
  1860             document_filters.push(format!(
  1861                 "toUnixTimestamp64Milli(d.ingested_at) < {to_unix_ms}"
  1862             ));
  1863         }
  1864         if let Some(recent_from_unix_ms) = recent_from_unix_ms {
  1865             document_filters.push(format!(
  1866                 "toUnixTimestamp64Milli(d.ingested_at) >= {recent_from_unix_ms}"
  1867             ));
  1868         }
  1869  
  1870         if include_tool_events {
  1871             postings_filters.push("p.payload_type != 'token_count'".to_string());
  1872         } else {
  1873             postings_filters
  1874                 .push("p.event_class IN ('message', 'reasoning', 'event_msg')".to_string());
  1875             postings_filters.push(
  1876                 "p.payload_type NOT IN ('token_count', 'task_started', 'task_complete', 'turn_aborted', 'item_completed')"
  1877                     .to_string(),
  1878             );
  1879         }
  1880  
  1881         if exclude_codex_mcp {
  1882             postings_filters.push("p.source_name != 'codex-mcp'".to_string());
  1883             postings_filters.push("lowerUTF8(p.name) NOT IN ('search', 'open')".to_string());
  1884         }
  1885  
  1886         if let Some(candidate_session_ids) = candidate_session_ids {
  1887             if !candidate_session_ids.is_empty() {
  1888                 postings_filters.push(format!(
  1889                     "p.session_id IN {}",
  1890                     sql_array_strings(candidate_session_ids)
  1891                 ));
  1892             }
  1893         }
  1894  
  1895         let prewhere_sql = postings_filters.join("\n      AND ");
  1896         let where_sql = if document_filters.is_empty() {
  1897             String::new()
  1898         } else {
  1899             format!("WHERE {}", document_filters.join("\n      AND "))
  1900         };
  1901         let docs_join_sql = if document_filters.is_empty() {
  1902             String::new()
  1903         } else {
  1904             let documents_table = self.table_ref("search_documents");
  1905             format!("ANY INNER JOIN {documents_table} AS d ON d.event_uid = p.doc_id")
  1906         };
  1907         (docs_join_sql, prewhere_sql, where_sql)
  1908     }
  1909  
  1910     fn build_search_conversation_candidates_sql(
  1911         &self,
  1912         terms: &[String],
  1913         idf_by_term: &HashMap<String, f64>,
  1914         include_tool_events: bool,
  1915         exclude_codex_mcp: bool,
  1916         min_should_match: u16,
  1917         limit: usize,
  1918         from_unix_ms: Option<i64>,
  1919         to_unix_ms: Option<i64>,
  1920         mode: Option<ConversationMode>,
  1921     ) -> RepoResult<String> {
  1922         if terms.is_empty() {
  1923             return Err(RepoError::invalid_argument(
  1924                 "cannot build candidate query with empty terms",
  1925             ));
  1926         }
  1927  
  1928         let postings_table = self.table_ref("search_postings");
  1929         let conversation_terms_table = self.table_ref("search_conversation_terms");
  1930         let terms_array_sql = sql_array_strings(terms);
  1931         let idf_vals: Vec<f64> = terms
  1932             .iter()
  1933             .map(|t| *idf_by_term.get(t).unwrap_or(&0.0))
  1934             .collect();
  1935         let idf_array_sql = sql_array_f64(&idf_vals);
  1936         let (docs_join_sql, prewhere_sql, where_sql) = self.build_conversation_postings_filter_sql(
  1937             terms,
  1938             include_tool_events,
  1939             exclude_codex_mcp,
  1940             from_unix_ms,
  1941             to_unix_ms,
  1942             None,
  1943             None,
  1944         );
  1945  
  1946         let (mode_join_sql, mode_filter_sql) = if let Some(selected_mode) = mode {
  1947             let mode_subquery = self.mode_subquery();
  1948             let mode_filter_sql = Self::mode_filter_clause(Some(selected_mode))
  1949                 .map(|clause| format!("AND {clause}"))
  1950                 .unwrap_or_default();
  1951             (
  1952                 format!("ANY LEFT JOIN ({mode_subquery}) AS m ON m.session_id = c.session_id"),
  1953                 mode_filter_sql,
  1954             )
  1955         } else {
  1956             (String::new(), String::new())
  1957         };
  1958  
  1959         Ok(format!(
  1960             "WITH
  1961   {terms_array_sql} AS q_terms,
  1962   {idf_array_sql} AS q_idf
  1963 SELECT
  1964   c.session_id AS session_id,
  1965   c.score AS score,
  1966   toUInt16(c.matched_terms) AS matched_terms
  1967 FROM (
  1968   SELECT
  1969     ct.session_id,
  1970     sum(transform(ct.term, q_terms, q_idf, 0.0) * log1p(toFloat64(ct.tf_sum))) AS score,
  1971     toUInt16(countDistinct(ct.term)) AS matched_terms
  1972   FROM {conversation_terms_table} AS ct
  1973   ANY INNER JOIN (
  1974     SELECT DISTINCT p.session_id
  1975     FROM {postings_table} AS p
  1976     {docs_join_sql}
  1977     PREWHERE {prewhere_sql}
  1978     {where_sql}
  1979   ) AS eligible ON eligible.session_id = ct.session_id
  1980   WHERE ct.term IN {terms_array_sql}
  1981   GROUP BY ct.session_id
  1982 ) AS c
  1983 {mode_join_sql}
  1984 WHERE c.matched_terms >= {min_should_match}
  1985   {mode_filter_sql}
  1986 ORDER BY c.score DESC, c.session_id ASC
  1987 LIMIT {limit}
  1988 FORMAT JSONEachRow",
  1989             conversation_terms_table = conversation_terms_table,
  1990             postings_table = postings_table,
  1991             docs_join_sql = docs_join_sql,
  1992             prewhere_sql = prewhere_sql,
  1993             where_sql = where_sql,
  1994             mode_join_sql = mode_join_sql,
  1995             mode_filter_sql = mode_filter_sql,
  1996             min_should_match = min_should_match,
  1997             limit = limit,
  1998         ))
  1999     }
  2000  
  2001     fn build_search_conversation_recent_candidates_sql(
  2002         &self,
  2003         terms: &[String],
  2004         idf_by_term: &HashMap<String, f64>,
  2005         include_tool_events: bool,
  2006         exclude_codex_mcp: bool,
  2007         min_should_match: u16,
  2008         limit: usize,
  2009         from_unix_ms: Option<i64>,
  2010         to_unix_ms: Option<i64>,
  2011         mode: Option<ConversationMode>,
  2012     ) -> RepoResult<String> {
  2013         if terms.is_empty() {
  2014             return Err(RepoError::invalid_argument(
  2015                 "cannot build recent candidate query with empty terms",
  2016             ));
  2017         }
  2018  
  2019         let postings_table = self.table_ref("search_postings");
  2020         let terms_array_sql = sql_array_strings(terms);
  2021         let idf_vals: Vec<f64> = terms
  2022             .iter()
  2023             .map(|t| *idf_by_term.get(t).unwrap_or(&0.0))
  2024             .collect();
  2025         let idf_array_sql = sql_array_f64(&idf_vals);
  2026         let now_unix_ms = Self::now_unix_ms();
  2027         let recent_floor = now_unix_ms.saturating_sub(CONVERSATION_RECENT_WINDOW_MS);
  2028         let recent_from_unix_ms = match from_unix_ms {
  2029             Some(from) => from.max(recent_floor),
  2030             None => recent_floor,
  2031         };
  2032         let (docs_join_sql, prewhere_sql, where_sql) = self.build_conversation_postings_filter_sql(
  2033             terms,
  2034             include_tool_events,
  2035             exclude_codex_mcp,
  2036             from_unix_ms,
  2037             to_unix_ms,
  2038             Some(recent_from_unix_ms),
  2039             None,
  2040         );
  2041  
  2042         let (mode_join_sql, mode_filter_sql) = if let Some(selected_mode) = mode {
  2043             let mode_subquery = self.mode_subquery();
  2044             let mode_filter_sql = Self::mode_filter_clause(Some(selected_mode))
  2045                 .map(|clause| format!("AND {clause}"))
  2046                 .unwrap_or_default();
  2047             (
  2048                 format!("ANY LEFT JOIN ({mode_subquery}) AS m ON m.session_id = c.session_id"),
  2049                 mode_filter_sql,
  2050             )
  2051         } else {
  2052             (String::new(), String::new())
  2053         };
  2054  
  2055         Ok(format!(
  2056             "WITH
  2057   {terms_array_sql} AS q_terms,
  2058   {idf_array_sql} AS q_idf
  2059 SELECT
  2060   c.session_id AS session_id,
  2061   c.score AS score,
  2062   toUInt16(c.matched_terms) AS matched_terms
  2063 FROM (
  2064   SELECT
  2065     p.session_id AS session_id,
  2066     sum(transform(toString(p.term), q_terms, q_idf, 0.0) * log1p(toFloat64(p.tf))) AS score,
  2067     toUInt16(countDistinct(p.term)) AS matched_terms
  2068   FROM {postings_table} AS p
  2069   {docs_join_sql}
  2070   PREWHERE {prewhere_sql}
  2071   {where_sql}
  2072   GROUP BY p.session_id
  2073 ) AS c
  2074 {mode_join_sql}
  2075 WHERE c.matched_terms >= {min_should_match}
  2076   {mode_filter_sql}
  2077 ORDER BY c.score DESC, c.session_id ASC
  2078 LIMIT {limit}
  2079 FORMAT JSONEachRow",
  2080             postings_table = postings_table,
  2081             docs_join_sql = docs_join_sql,
  2082             prewhere_sql = prewhere_sql,
  2083             where_sql = where_sql,
  2084             mode_join_sql = mode_join_sql,
  2085             mode_filter_sql = mode_filter_sql,
  2086             min_should_match = min_should_match,
  2087             limit = limit,
  2088         ))
  2089     }
  2090  
  2091     async fn fetch_conversation_candidates(
  2092         &self,
  2093         terms: &[String],
  2094         idf_by_term: &HashMap<String, f64>,
  2095         include_tool_events: bool,
  2096         exclude_codex_mcp: bool,
  2097         min_should_match: u16,
  2098         limit: u16,
  2099         from_unix_ms: Option<i64>,
  2100         to_unix_ms: Option<i64>,
  2101         mode: Option<ConversationMode>,
  2102     ) -> RepoResult<ConversationCandidateSet> {
  2103         let candidate_limit = Self::conversation_candidate_limit(limit);
  2104         let persistent_sql = self.build_search_conversation_candidates_sql(
  2105             terms,
  2106             idf_by_term,
  2107             include_tool_events,
  2108             exclude_codex_mcp,
  2109             min_should_match,
  2110             candidate_limit,
  2111             from_unix_ms,
  2112             to_unix_ms,
  2113             mode,
  2114         )?;
  2115         let mut persistent_rows: Vec<ConversationCandidateRow> =
  2116             self.map_backend(self.ch.query_rows(&persistent_sql, None).await)?;
  2117         let truncated = persistent_rows.len() >= candidate_limit;
  2118         if truncated {
  2119             return Ok(ConversationCandidateSet {
  2120                 rows: persistent_rows,
  2121                 truncated: true,
  2122             });
  2123         }
  2124  
  2125         let recent_sql = self.build_search_conversation_recent_candidates_sql(
  2126             terms,
  2127             idf_by_term,
  2128             include_tool_events,
  2129             exclude_codex_mcp,
  2130             min_should_match,
  2131             CONVERSATION_RECENT_CANDIDATE_LIMIT,
  2132             from_unix_ms,
  2133             to_unix_ms,
  2134             mode,
  2135         )?;
  2136         let recent_rows: Vec<ConversationCandidateRow> =
  2137             self.map_backend(self.ch.query_rows(&recent_sql, None).await)?;
  2138  
  2139         let mut by_session = HashMap::<String, (f64, u16)>::new();
  2140         for row in persistent_rows.drain(..) {
  2141             by_session.insert(row.session_id, (row.score, row.matched_terms));
  2142         }
  2143         for row in recent_rows {
  2144             let entry = by_session
  2145                 .entry(row.session_id)
  2146                 .or_insert((row.score, row.matched_terms));
  2147             if row.score > entry.0 {
  2148                 entry.0 = row.score;
  2149             }
  2150             if row.matched_terms > entry.1 {
  2151                 entry.1 = row.matched_terms;
  2152             }
  2153         }
  2154  
  2155         let mut rows = by_session
  2156             .into_iter()
  2157             .map(
  2158                 |(session_id, (score, matched_terms))| ConversationCandidateRow {
  2159                     session_id,
  2160                     score,
  2161                     matched_terms,
  2162                 },
  2163             )
  2164             .collect::<Vec<_>>();
  2165         rows.sort_by(|a, b| {
  2166             b.score
  2167                 .total_cmp(&a.score)
  2168                 .then_with(|| a.session_id.cmp(&b.session_id))
  2169         });
  2170         let max_rows = candidate_limit.saturating_add(CONVERSATION_RECENT_CANDIDATE_LIMIT);
  2171         if rows.len() > max_rows {
  2172             rows.truncate(max_rows);
  2173         }
  2174  
  2175         Ok(ConversationCandidateSet {
  2176             rows,
  2177             truncated: false,
  2178         })
  2179     }
  2180  
  2181     fn build_search_conversations_sql(
  2182         &self,
  2183         terms: &[String],
  2184         idf_by_term: &HashMap<String, f64>,
  2185         avgdl: f64,
  2186         include_tool_events: bool,
  2187         exclude_codex_mcp: bool,
  2188         min_should_match: u16,
  2189         min_score: f64,
  2190         limit: u16,
  2191         from_unix_ms: Option<i64>,
  2192         to_unix_ms: Option<i64>,
  2193         mode: Option<ConversationMode>,
  2194         candidate_session_ids: Option<&[String]>,
  2195     ) -> RepoResult<String> {
  2196         if terms.is_empty() {
  2197             return Err(RepoError::invalid_argument(
  2198                 "cannot build search query with empty terms",
  2199             ));
  2200         }
  2201  
  2202         let postings_table = self.table_ref("search_postings");
  2203         let session_summary_table = self.table_ref("v_session_summary");
  2204         let terms_array_sql = sql_array_strings(terms);
  2205         let idf_vals: Vec<f64> = terms
  2206             .iter()
  2207             .map(|t| *idf_by_term.get(t).unwrap_or(&0.0))
  2208             .collect();
  2209         let idf_array_sql = sql_array_f64(&idf_vals);
  2210         let (docs_join_sql, prewhere_sql, where_sql) = self.build_conversation_postings_filter_sql(
  2211             terms,
  2212             include_tool_events,
  2213             exclude_codex_mcp,
  2214             from_unix_ms,
  2215             to_unix_ms,
  2216             None,
  2217             candidate_session_ids,
  2218         );
  2219         let (mode_join_sql, mode_filter_sql) = if let Some(selected_mode) = mode {
  2220             let mode_subquery = self.mode_subquery();
  2221             let mode_filter_sql = Self::mode_filter_clause(Some(selected_mode))
  2222                 .map(|clause| format!("AND {clause}"))
  2223                 .unwrap_or_default();
  2224             (
  2225                 format!("ANY LEFT JOIN ({mode_subquery}) AS m ON m.session_id = c.session_id"),
  2226                 mode_filter_sql,
  2227             )
  2228         } else {
  2229             (String::new(), String::new())
  2230         };
  2231  
  2232         let k1 = self.cfg.bm25_k1.max(0.01);
  2233         let b = self.cfg.bm25_b.clamp(0.0, 1.0);
  2234         let use_term_bitmask = terms.len() <= 63;
  2235         let term_bits_with_sql = if use_term_bitmask {
  2236             ",\n  arrayMap(idx -> toUInt64(bitShiftLeft(toUInt64(1), idx - 1)), arrayEnumerate(q_terms)) AS q_bits"
  2237                 .to_string()
  2238         } else {
  2239             String::new()
  2240         };
  2241         let outer_matched_terms_sql = if use_term_bitmask {
  2242             "bitCount(groupBitOr(e.term_mask))".to_string()
  2243         } else {
  2244             "length(arrayDistinct(arrayFlatten(groupArray(e.matched_terms_arr))))".to_string()
  2245         };
  2246         let inner_matched_terms_sql = if use_term_bitmask {
  2247             "groupBitOr(transform(toString(p.term), q_terms, q_bits, toUInt64(0))) AS term_mask,"
  2248                 .to_string()
  2249         } else {
  2250             "groupUniqArray(toString(p.term)) AS matched_terms_arr,".to_string()
  2251         };
  2252  
  2253         Ok(format!(
  2254             "WITH
  2255   {k1:.6} AS k1,
  2256   {b:.6} AS b,
  2257   greatest({avgdl:.6}, 1.0) AS avgdl,
  2258   {terms_array_sql} AS q_terms,
  2259   {idf_array_sql} AS q_idf{term_bits_with_sql}
  2260 SELECT
  2261   c.session_id AS session_id,
  2262   if(s.session_id = '', '', toString(s.first_event_time)) AS first_event_time,
  2263   if(
  2264     s.session_id = '',
  2265     toInt64(0),
  2266     toInt64(toUnixTimestamp64Milli(s.first_event_time))
  2267   ) AS first_event_unix_ms,
  2268   if(s.session_id = '', '', toString(s.last_event_time)) AS last_event_time,
  2269   if(
  2270     s.session_id = '',
  2271     toInt64(0),
  2272     toInt64(toUnixTimestamp64Milli(s.last_event_time))
  2273   ) AS last_event_unix_ms,
  2274   c.provider AS provider,
  2275   c.score AS score,
  2276   toUInt16(c.matched_terms) AS matched_terms,
  2277   toUInt32(c.event_count_considered) AS event_count_considered,
  2278   c.best_event_uid AS best_event_uid
  2279 FROM (
  2280   SELECT
  2281     e.session_id AS session_id,
  2282     sum(e.event_score) AS score,
  2283     {outer_matched_terms_sql} AS matched_terms,
  2284     count() AS event_count_considered,
  2285     argMax(e.provider, e.event_score) AS provider,
  2286     argMax(e.event_uid, e.event_score) AS best_event_uid
  2287   FROM (
  2288     SELECT
  2289       p.doc_id AS event_uid,
  2290       any(p.session_id) AS session_id,
  2291       any(p.provider) AS provider,
  2292       {inner_matched_terms_sql}
  2293       sum(
  2294         transform(toString(p.term), q_terms, q_idf, 0.0)
  2295         *
  2296         (
  2297           (toFloat64(p.tf) * (k1 + 1.0))
  2298           /
  2299           (toFloat64(p.tf) + k1 * (1.0 - b + b * (toFloat64(p.doc_len) / avgdl)))
  2300         )
  2301       ) AS event_score
  2302     FROM {postings_table} AS p
  2303     {docs_join_sql}
  2304     PREWHERE {prewhere_sql}
  2305     {where_sql}
  2306     GROUP BY p.doc_id
  2307   ) AS e
  2308   GROUP BY e.session_id
  2309 ) AS c
  2310 ANY LEFT JOIN {session_summary_table} AS s ON s.session_id = c.session_id
  2311 {mode_join_sql}
  2312 WHERE c.matched_terms >= {min_should_match}
  2313   AND c.score >= {min_score:.6}
  2314   {mode_filter_sql}
  2315 ORDER BY c.score DESC, c.session_id ASC
  2316 LIMIT {limit}
  2317 FORMAT JSONEachRow",
  2318             postings_table = postings_table,
  2319             docs_join_sql = docs_join_sql,
  2320             prewhere_sql = prewhere_sql,
  2321             where_sql = where_sql,
  2322             outer_matched_terms_sql = outer_matched_terms_sql,
  2323             inner_matched_terms_sql = inner_matched_terms_sql,
  2324             term_bits_with_sql = term_bits_with_sql,
  2325             session_summary_table = session_summary_table,
  2326             mode_join_sql = mode_join_sql,
  2327             mode_filter_sql = mode_filter_sql,
  2328             min_should_match = min_should_match,
  2329             min_score = min_score,
  2330             limit = limit,
  2331         ))
  2332     }
  2333  
  2334     async fn fetch_conversation_snippets(
  2335         &self,
  2336         event_uids: &[String],
  2337     ) -> RepoResult<HashMap<String, String>> {
  2338         if event_uids.is_empty() {
  2339             return Ok(HashMap::new());
  2340         }
  2341  
  2342         let documents_table = self.table_ref("search_documents");
  2343         let event_uids_sql = sql_array_strings(event_uids);
  2344         let sql = format!(
  2345             "SELECT
  2346   event_uid,
  2347   leftUTF8(any(text_content), {preview}) AS snippet
  2348 FROM {documents_table}
  2349 WHERE event_uid IN {event_uids_sql}
  2350 GROUP BY event_uid
  2351 FORMAT JSONEachRow",
  2352             preview = self.cfg.preview_chars,
  2353             documents_table = documents_table,
  2354             event_uids_sql = event_uids_sql,
  2355         );
  2356         let rows: Vec<ConversationSnippetRow> =
  2357             self.map_backend(self.ch.query_rows(&sql, None).await)?;
  2358         let mut by_uid = HashMap::new();
  2359         for row in rows {
  2360             by_uid.insert(row.event_uid, row.snippet);
  2361         }
  2362         Ok(by_uid)
  2363     }
  2364  
  2365     async fn load_session_time_bounds(
  2366         &self,
  2367         session_ids: &[String],
  2368     ) -> RepoResult<HashMap<String, SessionTimeBounds>> {
  2369         let mut unique_session_ids = session_ids.to_vec();
  2370         unique_session_ids.sort_unstable();
  2371         unique_session_ids.dedup();
  2372         if unique_session_ids.is_empty() {
  2373             return Ok(HashMap::new());
  2374         }
  2375  
  2376         let session_summary_table = self.table_ref("v_session_summary");
  2377         let session_ids_sql = sql_array_strings(&unique_session_ids);
  2378         let sql = format!(
  2379             "SELECT
  2380   s.session_id AS session_id,
  2381   toString(s.first_event_time) AS first_event_time,
  2382   toString(s.last_event_time) AS last_event_time
  2383 FROM {session_summary_table} AS s
  2384 WHERE s.session_id IN {session_ids_sql}
  2385 FORMAT JSONEachRow",
  2386             session_summary_table = session_summary_table,
  2387             session_ids_sql = session_ids_sql,
  2388         );
  2389  
  2390         let rows: Vec<SessionTimeBoundsRow> =
  2391             match self.map_backend(self.ch.query_rows(&sql, None).await) {
  2392                 Ok(rows) => rows,
  2393                 Err(err) => {
  2394                     warn!("failed to load session time bounds: {}", err);
  2395                     return Ok(HashMap::new());
  2396                 }
  2397             };
  2398         let mut bounds_by_session = HashMap::new();
  2399         for row in rows {
  2400             bounds_by_session.insert(
  2401                 row.session_id,
  2402                 SessionTimeBounds {
  2403                     first_event_time: row.first_event_time,
  2404                     last_event_time: row.last_event_time,
  2405                 },
  2406             );
  2407         }
  2408         Ok(bounds_by_session)
  2409     }
  2410  
  2411     async fn map_search_rows_to_hits(
  2412         &self,
  2413         rows: Vec<SearchRow>,
  2414     ) -> RepoResult<Vec<SearchEventHit>> {
  2415         let session_ids = rows
  2416             .iter()
  2417             .map(|row| row.session_id.clone())
  2418             .collect::<Vec<_>>();
  2419         let session_time_bounds = self.load_session_time_bounds(&session_ids).await?;
  2420  
  2421         Ok(rows
  2422             .into_iter()
  2423             .enumerate()
  2424             .map(|(idx, row)| {
  2425                 let session_id = row.session_id;
  2426                 let (first_event_time, last_event_time) = session_time_bounds
  2427                     .get(session_id.as_str())
  2428                     .map(|bounds| {
  2429                         (
  2430                             bounds.first_event_time.clone(),
  2431                             bounds.last_event_time.clone(),
  2432                         )
  2433                     })
  2434                     .unwrap_or_default();
  2435  
  2436                 SearchEventHit {
  2437                     rank: idx + 1,
  2438                     event_uid: row.event_uid,
  2439                     session_id,
  2440                     first_event_time,
  2441                     last_event_time,
  2442                     source_name: row.source_name,
  2443                     provider: row.provider,
  2444                     score: row.score,
  2445                     matched_terms: row.matched_terms,
  2446                     doc_len: row.doc_len,
  2447                     event_class: row.event_class,
  2448                     payload_type: row.payload_type,
  2449                     actor_role: row.actor_role,
  2450                     name: row.name,
  2451                     phase: row.phase,
  2452                     source_ref: row.source_ref,
  2453                     text_preview: row.text_preview,
  2454                 }
  2455             })
  2456             .collect())
  2457     }
  2458  
  2459     async fn fetch_conversation_session_metadata(
  2460         &self,
  2461         session_ids: &[String],
  2462     ) -> RepoResult<HashMap<String, ConversationSessionMetadataRow>> {
  2463         if session_ids.is_empty() {
  2464             return Ok(HashMap::new());
  2465         }
  2466  
  2467         let events_table = self.table_ref("events");
  2468         let session_ids_sql = sql_array_strings(session_ids);
  2469         let sql = format!(
  2470             "SELECT
  2471   session_id,
  2472   argMax(provider, event_ts) AS provider,
  2473   ifNull(argMax(nullIf(JSONExtractString(payload_json, 'slug'), ''), event_ts), '') AS session_slug,
  2474   ifNull(
  2475     argMax(
  2476       coalesce(
  2477         nullIf(JSONExtractString(payload_json, 'summary'), ''),
  2478         nullIf(JSONExtractString(payload_json, 'title'), ''),
  2479         nullIf(JSONExtractString(payload_json, 'name'), '')
  2480       ),
  2481       event_ts
  2482     ),
  2483     ''
  2484   ) AS session_summary
  2485 FROM {events_table}
  2486 WHERE event_kind = 'session_meta'
  2487   AND session_id IN {session_ids_sql}
  2488 GROUP BY session_id
  2489 FORMAT JSONEachRow",
  2490             events_table = events_table,
  2491             session_ids_sql = session_ids_sql,
  2492         );
  2493  
  2494         let rows: Vec<ConversationSessionMetadataRow> =
  2495             self.map_backend(self.ch.query_rows(&sql, None).await)?;
  2496         let mut by_session = HashMap::new();
  2497         for row in rows {
  2498             by_session.insert(row.session_id.clone(), row);
  2499         }
  2500         Ok(by_session)
  2501     }
  2502  
  2503     async fn log_search_events(
  2504         &self,
  2505         query_id: &str,
  2506         source: &str,
  2507         raw_query: &str,
  2508         session_hint: &str,
  2509         terms: &[String],
  2510         limit: u16,
  2511         min_should_match: u16,
  2512         min_score: f64,
  2513         include_tool_events: bool,
  2514         event_kinds: Option<&[SearchEventKind]>,
  2515         exclude_codex_mcp: bool,
  2516         took_ms: u32,
  2517         hits: &[SearchEventHit],
  2518         docs: u64,
  2519         avgdl: f64,
  2520     ) {
  2521         let event_kinds = event_kinds
  2522             .map(|kinds| kinds.iter().map(|kind| kind.as_str()).collect::<Vec<_>>())
  2523             .unwrap_or_default();
  2524         let metadata_json = match serde_json::to_string(&json!({
  2525             "docs": docs,
  2526             "avgdl": avgdl,
  2527             "k1": self.cfg.bm25_k1,
  2528             "b": self.cfg.bm25_b,
  2529             "event_kinds": event_kinds
  2530         })) {
  2531             Ok(value) => value,
  2532             Err(err) => {
  2533                 warn!("failed to encode search metadata: {}", err);
  2534                 "{}".to_string()
  2535             }
  2536         };
  2537  
  2538         let query_row = json!({
  2539             "query_id": query_id,
  2540             "source": source,
  2541             "session_hint": session_hint,
  2542             "raw_query": raw_query,
  2543             "normalized_terms": terms,
  2544             "term_count": terms.len() as u16,
  2545             "result_limit": limit,
  2546             "min_should_match": min_should_match,
  2547             "min_score": min_score,
  2548             "include_tool_events": if include_tool_events { 1 } else { 0 },
  2549             "exclude_codex_mcp": if exclude_codex_mcp { 1 } else { 0 },
  2550             "response_ms": took_ms,
  2551             "result_count": hits.len() as u16,
  2552             "metadata_json": metadata_json,
  2553         });
  2554  
  2555         let hit_rows: Vec<Value> = hits
  2556             .iter()
  2557             .map(|hit| {
  2558                 json!({
  2559                     "query_id": query_id,
  2560                     "rank": hit.rank as u16,
  2561                     "event_uid": hit.event_uid,
  2562                     "session_id": hit.session_id,
  2563                     "source_name": hit.source_name,
  2564                     "provider": hit.provider,
  2565                     "score": hit.score,
  2566                     "matched_terms": hit.matched_terms as u16,
  2567                     "doc_len": hit.doc_len,
  2568                     "event_class": hit.event_class,
  2569                     "payload_type": hit.payload_type,
  2570                     "actor_role": hit.actor_role,
  2571                     "name": hit.name,
  2572                     "source_ref": hit.source_ref,
  2573                 })
  2574             })
  2575             .collect();
  2576  
  2577         let ch = self.ch.clone();
  2578         if self.cfg.async_log_writes {
  2579             tokio::spawn(async move {
  2580                 if let Err(err) = ch.insert_json_rows("search_query_log", &[query_row]).await {
  2581                     warn!("failed to write search_query_log: {}", err);
  2582                 }
  2583                 if !hit_rows.is_empty() {
  2584                     if let Err(err) = ch.insert_json_rows("search_hit_log", &hit_rows).await {
  2585                         warn!("failed to write search_hit_log: {}", err);
  2586                     }
  2587                 }
  2588             });
  2589         } else {
  2590             if let Err(err) = self
  2591                 .ch
  2592                 .insert_json_rows("search_query_log", &[query_row])
  2593                 .await
  2594             {
  2595                 warn!("failed to write search_query_log: {}", err);
  2596             }
  2597             if !hit_rows.is_empty() {
  2598                 if let Err(err) = self.ch.insert_json_rows("search_hit_log", &hit_rows).await {
  2599                     warn!("failed to write search_hit_log: {}", err);
  2600                 }
  2601             }
  2602         }
  2603     }
  2604 }
  2605  
  2606 #[async_trait]
  2607 impl ConversationRepository for ClickHouseConversationRepository {
  2608     async fn list_conversations(
  2609         &self,
  2610         filter: ConversationListFilter,
  2611         page: PageRequest,
  2612     ) -> RepoResult<Page<ConversationSummary>> {
  2613         Self::validate_time_bounds(filter.from_unix_ms, filter.to_unix_ms)?;
  2614  
  2615         let limit = page.normalized_limit(self.cfg.max_results);
  2616         let filter_sig = Self::conversation_filter_sig(&filter);
  2617  
  2618         let cursor = if let Some(token) = page.cursor.as_deref() {
  2619             let cursor: ConversationCursor = decode_cursor(token)?;
  2620             if cursor.filter_sig != filter_sig {
  2621                 return Err(RepoError::invalid_cursor(
  2622                     "cursor does not match current conversation filter",
  2623                 ));
  2624             }
  2625             Some(cursor)
  2626         } else {
  2627             None
  2628         };
  2629  
  2630         let session_summary = self.table_ref("v_session_summary");
  2631         let mode_subquery = self.mode_subquery();
  2632  
  2633         let mut where_clauses = vec!["1 = 1".to_string()];
  2634  
  2635         if let Some(from_unix_ms) = filter.from_unix_ms {
  2636             where_clauses.push(format!(
  2637                 "toUnixTimestamp64Milli(s.last_event_time) >= {from_unix_ms}"
  2638             ));
  2639         }
  2640         if let Some(to_unix_ms) = filter.to_unix_ms {
  2641             where_clauses.push(format!(
  2642                 "toUnixTimestamp64Milli(s.last_event_time) < {to_unix_ms}"
  2643             ));
  2644         }
  2645         if let Some(mode_clause) = Self::mode_filter_clause(filter.mode) {
  2646             where_clauses.push(mode_clause);
  2647         }
  2648  
  2649         if let Some(cursor) = &cursor {
  2650             where_clauses.push(format!(
  2651                 "(toUnixTimestamp64Milli(s.last_event_time) < {} OR (toUnixTimestamp64Milli(s.last_event_time) = {} AND s.session_id < {}))",
  2652                 cursor.last_event_unix_ms,
  2653                 cursor.last_event_unix_ms,
  2654                 sql_quote(&cursor.session_id)
  2655             ));
  2656         }
  2657  
  2658         let where_sql = where_clauses.join("\n  AND ");
  2659  
  2660         let query = format!(
  2661             "SELECT
  2662   s.session_id,
  2663   toString(s.first_event_time) AS first_event_time,
  2664   toInt64(toUnixTimestamp64Milli(s.first_event_time)) AS first_event_unix_ms,
  2665   toString(s.last_event_time) AS last_event_time,
  2666   toInt64(toUnixTimestamp64Milli(s.last_event_time)) AS last_event_unix_ms,
  2667   toUInt32(s.total_turns) AS total_turns,
  2668   toUInt64(s.total_events) AS total_events,
  2669   toUInt64(s.user_messages) AS user_messages,
  2670   toUInt64(s.assistant_messages) AS assistant_messages,
  2671   toUInt64(s.tool_calls) AS tool_calls,
  2672   toUInt64(s.tool_results) AS tool_results,
  2673   ifNull(m.mode, 'chat') AS mode
  2674 FROM {session_summary} AS s
  2675 LEFT JOIN ({mode_subquery}) AS m ON m.session_id = s.session_id
  2676 WHERE {where_sql}
  2677 ORDER BY s.last_event_time DESC, s.session_id DESC
  2678 LIMIT {limit_plus}
  2679 FORMAT JSONEachRow",
  2680             session_summary = session_summary,
  2681             mode_subquery = mode_subquery,
  2682             where_sql = where_sql,
  2683             limit_plus = (limit as usize) + 1,
  2684         );
  2685  
  2686         let rows: Vec<ConversationSummaryRow> =
  2687             self.map_backend(self.ch.query_rows(&query, None).await)?;
  2688  
  2689         let mut items: Vec<ConversationSummary> = rows
  2690             .iter()
  2691             .take(limit as usize)
  2692             .cloned()
  2693             .map(Self::map_conversation_row)
  2694             .collect();
  2695  
  2696         let next_cursor = if rows.len() > limit as usize {
  2697             if let Some(last) = items.last() {
  2698                 Some(encode_cursor(&ConversationCursor {
  2699                     last_event_unix_ms: last.last_event_unix_ms,
  2700                     session_id: last.session_id.clone(),
  2701                     filter_sig,
  2702                 })?)
  2703             } else {
  2704                 None
  2705             }
  2706         } else {
  2707             None
  2708         };
  2709  
  2710         Ok(Page {
  2711             items: std::mem::take(&mut items),
  2712             next_cursor,
  2713         })
  2714     }
  2715  
  2716     async fn get_conversation(
  2717         &self,
  2718         session_id: &str,
  2719         opts: ConversationDetailOptions,
  2720     ) -> RepoResult<Option<Conversation>> {
  2721         Self::validate_session_id(session_id)?;
  2722  
  2723         let Some(summary) = self.load_conversation_summary(session_id).await? else {
  2724             return Ok(None);
  2725         };
  2726  
  2727         let turns = if opts.include_turns {
  2728             self.load_turns_for_session(session_id).await?
  2729         } else {
  2730             Vec::new()
  2731         };
  2732  
  2733         Ok(Some(Conversation { summary, turns }))
  2734     }
  2735  
  2736     async fn list_turns(
  2737         &self,
  2738         session_id: &str,
  2739         filter: TurnListFilter,
  2740         page: PageRequest,
  2741     ) -> RepoResult<Page<TurnSummary>> {
  2742         Self::validate_session_id(session_id)?;
  2743  
  2744         let limit = page.normalized_limit(self.cfg.max_results);
  2745         let filter_sig = Self::turn_filter_sig(session_id, &filter);
  2746  
  2747         let cursor = if let Some(token) = page.cursor.as_deref() {
  2748             let cursor: TurnCursor = decode_cursor(token)?;
  2749             if cursor.filter_sig != filter_sig {
  2750                 return Err(RepoError::invalid_cursor(
  2751                     "cursor does not match current turn filter",
  2752                 ));
  2753             }
  2754             Some(cursor)
  2755         } else {
  2756             None
  2757         };
  2758  
  2759         let turn_summary = self.table_ref("v_turn_summary");
  2760         let mut where_clauses = vec![format!("session_id = {}", sql_quote(session_id))];
  2761  
  2762         if let Some(from_turn_seq) = filter.from_turn_seq {
  2763             where_clauses.push(format!("turn_seq >= {from_turn_seq}"));
  2764         }
  2765         if let Some(to_turn_seq) = filter.to_turn_seq {
  2766             where_clauses.push(format!("turn_seq <= {to_turn_seq}"));
  2767         }
  2768  
  2769         if let Some(cursor) = &cursor {
  2770             if cursor.session_id != session_id {
  2771                 return Err(RepoError::invalid_cursor(
  2772                     "cursor session_id does not match requested session_id",
  2773                 ));
  2774             }
  2775             where_clauses.push(format!("turn_seq > {}", cursor.last_turn_seq));
  2776         }
  2777  
  2778         let where_sql = where_clauses.join("\n  AND ");
  2779         let query = format!(
  2780             "SELECT
  2781   session_id,
  2782   toUInt32(turn_seq) AS turn_seq,
  2783   ifNull(turn_id, '') AS turn_id,
  2784   toString(started_at) AS started_at,
  2785   toInt64(toUnixTimestamp64Milli(started_at)) AS started_at_unix_ms,
  2786   toString(ended_at) AS ended_at,
  2787   toInt64(toUnixTimestamp64Milli(ended_at)) AS ended_at_unix_ms,
  2788   toUInt64(total_events) AS total_events,
  2789   toUInt64(user_messages) AS user_messages,
  2790   toUInt64(assistant_messages) AS assistant_messages,
  2791   toUInt64(tool_calls) AS tool_calls,
  2792   toUInt64(tool_results) AS tool_results,
  2793   toUInt64(reasoning_items) AS reasoning_items
  2794 FROM {turn_summary}
  2795 WHERE {where_sql}
  2796 ORDER BY turn_seq ASC
  2797 LIMIT {limit_plus}
  2798 FORMAT JSONEachRow",
  2799             turn_summary = turn_summary,
  2800             where_sql = where_sql,
  2801             limit_plus = (limit as usize) + 1,
  2802         );
  2803  
  2804         let rows: Vec<TurnSummaryRow> = self.map_backend(self.ch.query_rows(&query, None).await)?;
  2805         let items: Vec<TurnSummary> = rows
  2806             .iter()
  2807             .take(limit as usize)
  2808             .cloned()
  2809             .map(Self::map_turn_row)
  2810             .collect();
  2811  
  2812         let next_cursor = if rows.len() > limit as usize {
  2813             if let Some(last) = items.last() {
  2814                 Some(encode_cursor(&TurnCursor {
  2815                     last_turn_seq: last.turn_seq,
  2816                     session_id: session_id.to_string(),
  2817                     filter_sig,
  2818                 })?)
  2819             } else {
  2820                 None
  2821             }
  2822         } else {
  2823             None
  2824         };
  2825  
  2826         Ok(Page { items, next_cursor })
  2827     }
  2828  
  2829     async fn get_turn(&self, session_id: &str, turn_seq: u32) -> RepoResult<Option<Turn>> {
  2830         Self::validate_session_id(session_id)?;
  2831  
  2832         let turn_summary = self.table_ref("v_turn_summary");
  2833         let summary_query = format!(
  2834             "SELECT
  2835   session_id,
  2836   toUInt32(turn_seq) AS turn_seq,
  2837   ifNull(turn_id, '') AS turn_id,
  2838   toString(started_at) AS started_at,
  2839   toInt64(toUnixTimestamp64Milli(started_at)) AS started_at_unix_ms,
  2840   toString(ended_at) AS ended_at,
  2841   toInt64(toUnixTimestamp64Milli(ended_at)) AS ended_at_unix_ms,
  2842   toUInt64(total_events) AS total_events,
  2843   toUInt64(user_messages) AS user_messages,
  2844   toUInt64(assistant_messages) AS assistant_messages,
  2845   toUInt64(tool_calls) AS tool_calls,
  2846   toUInt64(tool_results) AS tool_results,
  2847   toUInt64(reasoning_items) AS reasoning_items
  2848 FROM {turn_summary}
  2849 WHERE session_id = {} AND turn_seq = {}
  2850 LIMIT 1
  2851 FORMAT JSONEachRow",
  2852             sql_quote(session_id),
  2853             turn_seq,
  2854         );
  2855  
  2856         let rows: Vec<TurnSummaryRow> =
  2857             self.map_backend(self.ch.query_rows(&summary_query, None).await)?;
  2858         let Some(summary_row) = rows.into_iter().next() else {
  2859             return Ok(None);
  2860         };
  2861  
  2862         let trace_table = self.table_ref("v_conversation_trace");
  2863         let events_query = format!(
  2864             "SELECT
  2865   session_id,
  2866   event_uid,
  2867   toUInt64(event_order) AS event_order,
  2868   toUInt32(turn_seq) AS turn_seq,
  2869   toString(event_time) AS event_time,
  2870   actor_role,
  2871   event_class,
  2872   payload_type,
  2873   call_id,
  2874   name,
  2875   phase,
  2876   item_id,
  2877   source_ref,
  2878   text_content,
  2879   payload_json,
  2880   token_usage_json
  2881 FROM {trace_table}
  2882 WHERE session_id = {} AND turn_seq = {}
  2883 ORDER BY event_order ASC
  2884 FORMAT JSONEachRow",
  2885             sql_quote(session_id),
  2886             turn_seq,
  2887         );
  2888  
  2889         let event_rows: Vec<TraceEventRow> =
  2890             self.map_backend(self.ch.query_rows(&events_query, None).await)?;
  2891         let events = event_rows.into_iter().map(Self::map_trace_event).collect();
  2892  
  2893         Ok(Some(Turn {
  2894             summary: Self::map_turn_row(summary_row),
  2895             events,
  2896         }))
  2897     }
  2898  
  2899     async fn open_event(&self, req: OpenEventRequest) -> RepoResult<OpenContext> {
  2900         let event_uid = req.event_uid.trim();
  2901         if event_uid.is_empty() {
  2902             return Err(RepoError::invalid_argument("event_uid cannot be empty"));
  2903         }
  2904         Self::validate_event_uid(event_uid)?;
  2905  
  2906         let before = req.before.unwrap_or(self.cfg.default_context_before);
  2907         let after = req.after.unwrap_or(self.cfg.default_context_after);
  2908         let include_system_events = req.include_system_events.unwrap_or(false);
  2909         let context_filter = Self::open_context_filter_clause(include_system_events);
  2910         let trace_table = self.table_ref("v_conversation_trace");
  2911  
  2912         let target_query = format!(
  2913             "SELECT session_id, event_order, turn_seq FROM {trace_table} WHERE event_uid = {} ORDER BY event_order DESC LIMIT 1 FORMAT JSONEachRow",
  2914             sql_quote(event_uid)
  2915         );
  2916  
  2917         let targets: Vec<OpenTargetRow> =
  2918             self.map_backend(self.ch.query_rows(&target_query, None).await)?;
  2919         let Some(target) = targets.first() else {
  2920             return Ok(OpenContext {
  2921                 found: false,
  2922                 event_uid: event_uid.to_string(),
  2923                 session_id: String::new(),
  2924                 target_event_order: 0,
  2925                 turn_seq: 0,
  2926                 before,
  2927                 after,
  2928                 events: Vec::new(),
  2929             });
  2930         };
  2931  
  2932         let target_row_query = format!(
  2933             "SELECT
  2934   session_id,
  2935   event_uid,
  2936   toUInt64(event_order) AS event_order,
  2937   toUInt32(turn_seq) AS turn_seq,
  2938   toString(event_time) AS event_time,
  2939   actor_role,
  2940   event_class,
  2941   payload_type,
  2942   call_id,
  2943   name,
  2944   phase,
  2945   item_id,
  2946   source_ref,
  2947   text_content,
  2948   payload_json,
  2949   token_usage_json
  2950 FROM {trace_table}
  2951 WHERE session_id = {} AND event_order = {} AND event_uid = {}
  2952 ORDER BY event_order ASC
  2953 FORMAT JSONEachRow",
  2954             sql_quote(&target.session_id),
  2955             target.event_order,
  2956             sql_quote(event_uid),
  2957         );
  2958  
  2959         let mut before_rows: Vec<TraceEventRow> = if before == 0 {
  2960             Vec::new()
  2961         } else {
  2962             let before_query = format!(
  2963                 "SELECT
  2964   session_id,
  2965   event_uid,
  2966   toUInt64(event_order) AS event_order,
  2967   toUInt32(turn_seq) AS turn_seq,
  2968   toString(event_time) AS event_time,
  2969   actor_role,
  2970   event_class,
  2971   payload_type,
  2972   call_id,
  2973   name,
  2974   phase,
  2975   item_id,
  2976   source_ref,
  2977   text_content,
  2978   payload_json,
  2979   token_usage_json
  2980 FROM {trace_table}
  2981 WHERE session_id = {} AND event_order < {}{}
  2982 ORDER BY event_order DESC
  2983 LIMIT {}
  2984 FORMAT JSONEachRow",
  2985                 sql_quote(&target.session_id),
  2986                 target.event_order,
  2987                 context_filter,
  2988                 before,
  2989             );
  2990  
  2991             self.map_backend(self.ch.query_rows(&before_query, None).await)?
  2992         };
  2993         if !include_system_events {
  2994             before_rows.retain(|row| {
  2995                 !Self::is_low_information_system_event(&row.actor_role, &row.payload_type)
  2996             });
  2997         }
  2998         before_rows.reverse();
  2999  
  3000         let target_rows: Vec<TraceEventRow> =
  3001             self.map_backend(self.ch.query_rows(&target_row_query, None).await)?;
  3002  
  3003         let mut after_rows: Vec<TraceEventRow> = if after == 0 {
  3004             Vec::new()
  3005         } else {
  3006             let after_query = format!(
  3007                 "SELECT
  3008   session_id,
  3009   event_uid,
  3010   toUInt64(event_order) AS event_order,
  3011   toUInt32(turn_seq) AS turn_seq,
  3012   toString(event_time) AS event_time,
  3013   actor_role,
  3014   event_class,
  3015   payload_type,
  3016   call_id,
  3017   name,
  3018   phase,
  3019   item_id,
  3020   source_ref,
  3021   text_content,
  3022   payload_json,
  3023   token_usage_json
  3024 FROM {trace_table}
  3025 WHERE session_id = {} AND event_order > {}{}
  3026 ORDER BY event_order ASC
  3027 LIMIT {}
  3028 FORMAT JSONEachRow",
  3029                 sql_quote(&target.session_id),
  3030                 target.event_order,
  3031                 context_filter,
  3032                 after,
  3033             );
  3034  
  3035             self.map_backend(self.ch.query_rows(&after_query, None).await)?
  3036         };
  3037         if !include_system_events {
  3038             after_rows.retain(|row| {
  3039                 !Self::is_low_information_system_event(&row.actor_role, &row.payload_type)
  3040             });
  3041         }
  3042  
  3043         let mut rows = Vec::with_capacity(before_rows.len() + target_rows.len() + after_rows.len());
  3044         rows.extend(before_rows);
  3045         rows.extend(target_rows);
  3046         rows.extend(after_rows);
  3047  
  3048         let events: Vec<OpenEvent> = rows
  3049             .into_iter()
  3050             .map(|row| OpenEvent {
  3051                 is_target: row.event_uid == event_uid,
  3052                 session_id: row.session_id,
  3053                 event_uid: row.event_uid,
  3054                 event_order: row.event_order,
  3055                 turn_seq: row.turn_seq,
  3056                 event_time: row.event_time,
  3057                 actor_role: row.actor_role,
  3058                 event_class: row.event_class,
  3059                 payload_type: row.payload_type,
  3060                 call_id: row.call_id,
  3061                 name: row.name,
  3062                 phase: row.phase,
  3063                 item_id: row.item_id,
  3064                 source_ref: row.source_ref,
  3065                 text_content: row.text_content,
  3066                 payload_json: row.payload_json,
  3067                 token_usage_json: row.token_usage_json,
  3068             })
  3069             .collect();
  3070  
  3071         Ok(OpenContext {
  3072             found: true,
  3073             event_uid: event_uid.to_string(),
  3074             session_id: target.session_id.clone(),
  3075             target_event_order: target.event_order,
  3076             turn_seq: target.turn_seq,
  3077             before,
  3078             after,
  3079             events,
  3080         })
  3081     }
  3082  
  3083     async fn search_events(&self, query: SearchEventsQuery) -> RepoResult<SearchEventsResult> {
  3084         let query_text = query.query.trim();
  3085         if query_text.is_empty() {
  3086             return Err(RepoError::invalid_argument("query cannot be empty"));
  3087         }
  3088         let source = query
  3089             .source
  3090             .as_deref()
  3091             .map(str::trim)
  3092             .filter(|raw| !raw.is_empty())
  3093             .unwrap_or("moraine-conversations");
  3094  
  3095         let query_id = if source == BENCHMARK_REPLAY_SOURCE {
  3096             "benchmark-replay".to_string()
  3097         } else {
  3098             Uuid::new_v4().to_string()
  3099         };
  3100         let started = Instant::now();
  3101  
  3102         let terms_with_qf = tokenize_query(query_text, self.cfg.bm25_max_query_terms);
  3103         if terms_with_qf.is_empty() {
  3104             return Err(RepoError::invalid_argument(
  3105                 "query has no searchable terms (tokens shorter than 2 characters are excluded)",
  3106             ));
  3107         }
  3108         let terms: Vec<String> = terms_with_qf.iter().map(|(term, _)| term.clone()).collect();
  3109  
  3110         let requested_limit = query.limit.unwrap_or(self.cfg.max_results).max(1);
  3111         let limit = requested_limit.min(self.cfg.max_results);
  3112         let limit_capped = requested_limit > limit;
  3113  
  3114         let min_should_match = query
  3115             .min_should_match
  3116             .unwrap_or(self.cfg.bm25_default_min_should_match)
  3117             .max(1)
  3118             .min(terms.len() as u16);
  3119  
  3120         let min_score = query.min_score.unwrap_or(self.cfg.bm25_default_min_score);
  3121         let include_tool_events = query
  3122             .include_tool_events
  3123             .unwrap_or(self.cfg.default_include_tool_events);
  3124         let event_kinds = Self::normalize_event_kinds(query.event_kinds)?;
  3125         let exclude_codex_mcp = query
  3126             .exclude_codex_mcp
  3127             .unwrap_or(self.cfg.default_exclude_codex_mcp);
  3128         let disable_cache = query.disable_cache.unwrap_or(false);
  3129         let effective_strategy = query.search_strategy.unwrap_or_default();
  3130  
  3131         if let Some(session_id) = query.session_id.as_deref() {
  3132             Self::validate_session_id(session_id)?;
  3133         }
  3134         let session_id = query.session_id.as_deref();
  3135  
  3136         let (docs, total_doc_len) = self.corpus_stats().await?;
  3137         if docs == 0 {
  3138             return Ok(SearchEventsResult {
  3139                 query_id,
  3140                 query: query_text.to_string(),
  3141                 terms,
  3142                 stats: SearchEventsStats {
  3143                     docs: 0,
  3144                     avgdl: 0.0,
  3145                     took_ms: started.elapsed().as_millis() as u32,
  3146                     result_count: 0,
  3147                     requested_limit,
  3148                     effective_limit: limit,
  3149                     limit_capped,
  3150                 },
  3151                 hits: Vec::new(),
  3152             });
  3153         }
  3154  
  3155         let avgdl = (total_doc_len as f64 / docs as f64).max(1.0);
  3156         let fetch_limit = Self::dedupe_fetch_limit(limit);
  3157  
  3158         let hits = if disable_cache {
  3159             let rows = self
  3160                 .search_events_rows_by_strategy(
  3161                     effective_strategy,
  3162                     &terms,
  3163                     docs,
  3164                     avgdl,
  3165                     include_tool_events,
  3166                     event_kinds.as_deref(),
  3167                     exclude_codex_mcp,
  3168                     session_id,
  3169                     min_should_match,
  3170                     min_score,
  3171                     fetch_limit,
  3172                 )
  3173                 .await?;
  3174             let rows = Self::dedupe_search_rows(rows, limit);
  3175             self.map_search_rows_to_hits(rows).await?
  3176         } else {
  3177             let cache_key = Self::search_events_cache_key(
  3178                 &terms,
  3179                 effective_strategy,
  3180                 include_tool_events,
  3181                 event_kinds.as_deref(),
  3182                 exclude_codex_mcp,
  3183                 session_id,
  3184                 min_should_match,
  3185                 min_score,
  3186                 limit,
  3187             );
  3188  
  3189             if let Some(cached_hits) = self.search_events_cache_get(&cache_key).await {
  3190                 cached_hits
  3191             } else {
  3192                 let fresh_rows = self
  3193                     .search_events_rows_by_strategy(
  3194                         effective_strategy,
  3195                         &terms,
  3196                         docs,
  3197                         avgdl,
  3198                         include_tool_events,
  3199                         event_kinds.as_deref(),
  3200                         exclude_codex_mcp,
  3201                         session_id,
  3202                         min_should_match,
  3203                         min_score,
  3204                         fetch_limit,
  3205                     )
  3206                     .await?;
  3207                 let fresh_rows = Self::dedupe_search_rows(fresh_rows, limit);
  3208                 let fresh_hits = self.map_search_rows_to_hits(fresh_rows).await?;
  3209                 self.search_events_cache_put(cache_key, &fresh_hits).await;
  3210                 fresh_hits
  3211             }
  3212         };
  3213  
  3214         let took_ms = started.elapsed().as_millis() as u32;
  3215  
  3216         if source != BENCHMARK_REPLAY_SOURCE {
  3217             self.log_search_events(
  3218                 &query_id,
  3219                 source,
  3220                 query_text,
  3221                 session_id.unwrap_or(""),
  3222                 &terms,
  3223                 limit,
  3224                 min_should_match,
  3225                 min_score,
  3226                 include_tool_events,
  3227                 event_kinds.as_deref(),
  3228                 exclude_codex_mcp,
  3229                 took_ms,
  3230                 &hits,
  3231                 docs,
  3232                 avgdl,
  3233             )
  3234             .await;
  3235         }
  3236  
  3237         Ok(SearchEventsResult {
  3238             query_id,
  3239             query: query_text.to_string(),
  3240             terms,
  3241             stats: SearchEventsStats {
  3242                 docs,
  3243                 avgdl,
  3244                 took_ms,
  3245                 result_count: hits.len(),
  3246                 requested_limit,
  3247                 effective_limit: limit,
  3248                 limit_capped,
  3249             },
  3250             hits,
  3251         })
  3252     }
  3253  
  3254     async fn search_conversations(
  3255         &self,
  3256         query: ConversationSearchQuery,
  3257     ) -> RepoResult<ConversationSearchResults> {
  3258         let query_text = query.query.trim();
  3259         if query_text.is_empty() {
  3260             return Err(RepoError::invalid_argument("query cannot be empty"));
  3261         }
  3262  
  3263         Self::validate_time_bounds(query.from_unix_ms, query.to_unix_ms)?;
  3264  
  3265         let query_id = Uuid::new_v4().to_string();
  3266         let started = Instant::now();
  3267  
  3268         let terms_with_qf = tokenize_query(query_text, self.cfg.bm25_max_query_terms);
  3269         if terms_with_qf.is_empty() {
  3270             return Err(RepoError::invalid_argument(
  3271                 "query has no searchable terms (tokens shorter than 2 characters are excluded)",
  3272             ));
  3273         }
  3274         let terms: Vec<String> = terms_with_qf.iter().map(|(term, _)| term.clone()).collect();
  3275  
  3276         let requested_limit = query.limit.unwrap_or(self.cfg.max_results).max(1);
  3277         let limit = requested_limit.min(self.cfg.max_results);
  3278         let limit_capped = requested_limit > limit;
  3279  
  3280         let min_should_match = query
  3281             .min_should_match
  3282             .unwrap_or(self.cfg.bm25_default_min_should_match)
  3283             .max(1)
  3284             .min(terms.len() as u16);
  3285  
  3286         let min_score = query.min_score.unwrap_or(self.cfg.bm25_default_min_score);
  3287         let include_tool_events = query
  3288             .include_tool_events
  3289             .unwrap_or(self.cfg.default_include_tool_events);
  3290         let exclude_codex_mcp = query
  3291             .exclude_codex_mcp
  3292             .unwrap_or(self.cfg.default_exclude_codex_mcp);
  3293  
  3294         let (docs, total_doc_len) = self.corpus_stats().await?;
  3295         if docs == 0 {
  3296             return Ok(ConversationSearchResults {
  3297                 query_id,
  3298                 query: query_text.to_string(),
  3299                 terms,
  3300                 stats: ConversationSearchStats {
  3301                     docs: 0,
  3302                     avgdl: 0.0,
  3303                     took_ms: started.elapsed().as_millis() as u32,
  3304                     result_count: 0,
  3305                     requested_limit,
  3306                     effective_limit: limit,
  3307                     limit_capped,
  3308                 },
  3309                 hits: Vec::new(),
  3310             });
  3311         }
  3312  
  3313         let avgdl = (total_doc_len as f64 / docs as f64).max(1.0);
  3314         let df_map = self.df_map(&terms).await?;
  3315  
  3316         let mut idf_by_term = HashMap::<String, f64>::new();
  3317         for term in &terms {
  3318             let df = *df_map.get(term).unwrap_or(&0);
  3319             let idf = if df == 0 {
  3320                 (1.0 + ((docs as f64 + 0.5) / 0.5)).ln()
  3321             } else {
  3322                 let n = docs.max(df) as f64;
  3323                 (1.0 + ((n - df as f64 + 0.5) / (df as f64 + 0.5))).ln()
  3324             };
  3325             idf_by_term.insert(term.clone(), idf.max(0.0));
  3326         }
  3327  
  3328         let candidate_set = match self
  3329             .fetch_conversation_candidates(
  3330                 &terms,
  3331                 &idf_by_term,
  3332                 include_tool_events,
  3333                 exclude_codex_mcp,
  3334                 min_should_match,
  3335                 limit,
  3336                 query.from_unix_ms,
  3337                 query.to_unix_ms,
  3338                 query.mode,
  3339             )
  3340             .await
  3341         {
  3342             Ok(set) => set,
  3343             Err(err) => {
  3344                 warn!("search_conversations candidate stage failed; falling back to exact path: {err}");
  3345                 ConversationCandidateSet::default()
  3346             }
  3347         };
  3348         let candidate_limit = Self::conversation_candidate_limit(limit);
  3349         let candidate_session_ids = if candidate_set.truncated
  3350             || candidate_set.rows.is_empty()
  3351             || candidate_set.rows.len() >= candidate_limit
  3352         {
  3353             None
  3354         } else {
  3355             Some(
  3356                 candidate_set
  3357                     .rows
  3358                     .into_iter()
  3359                     .map(|row| row.session_id)
  3360                     .collect::<Vec<_>>(),
  3361             )
  3362         };
  3363  
  3364         let sql = self.build_search_conversations_sql(
  3365             &terms,
  3366             &idf_by_term,
  3367             avgdl,
  3368             include_tool_events,
  3369             exclude_codex_mcp,
  3370             min_should_match,
  3371             min_score,
  3372             limit,
  3373             query.from_unix_ms,
  3374             query.to_unix_ms,
  3375             query.mode,
  3376             candidate_session_ids.as_deref(),
  3377         )?;
  3378  
  3379         let rows: Vec<ConversationSearchRow> =
  3380             self.map_backend(self.ch.query_rows(&sql, None).await)?;
  3381         let best_event_uids = rows
  3382             .iter()
  3383             .filter_map(|row| {
  3384                 if row.best_event_uid.is_empty() {
  3385                     None
  3386                 } else {
  3387                     Some(row.best_event_uid.clone())
  3388                 }
  3389             })
  3390             .collect::<Vec<_>>();
  3391         let snippet_by_event_uid = self.fetch_conversation_snippets(&best_event_uids).await?;
  3392         let session_ids = rows
  3393             .iter()
  3394             .map(|row| row.session_id.clone())
  3395             .collect::<Vec<_>>();
  3396         let session_metadata_by_session_id = self
  3397             .fetch_conversation_session_metadata(&session_ids)
  3398             .await?;
  3399  
  3400         let hits = rows
  3401             .into_iter()
  3402             .enumerate()
  3403             .map(|(idx, row)| {
  3404                 let ConversationSearchRow {
  3405                     session_id,
  3406                     first_event_time,
  3407                     first_event_unix_ms,
  3408                     last_event_time,
  3409                     last_event_unix_ms,
  3410                     provider: row_provider,
  3411                     score,
  3412                     matched_terms,
  3413                     event_count_considered,
  3414                     best_event_uid: row_best_event_uid,
  3415                     snippet: row_snippet,
  3416                 } = row;
  3417                 let session_metadata = session_metadata_by_session_id.get(&session_id);
  3418  
  3419                 let best_event_uid = if row_best_event_uid.is_empty() {
  3420                     None
  3421                 } else {
  3422                     Some(row_best_event_uid)
  3423                 };
  3424                 let snippet = best_event_uid
  3425                     .as_ref()
  3426                     .and_then(|event_uid| snippet_by_event_uid.get(event_uid).cloned())
  3427                     .or((!row_snippet.is_empty()).then_some(row_snippet));
  3428                 let has_first_event_time = !first_event_time.is_empty();
  3429                 let has_last_event_time = !last_event_time.is_empty();
  3430                 let provider = session_metadata
  3431                     .and_then(|meta| (!meta.provider.is_empty()).then(|| meta.provider.clone()))
  3432                     .or((!row_provider.is_empty()).then_some(row_provider));
  3433                 let session_slug = session_metadata.and_then(|meta| {
  3434                     (!meta.session_slug.is_empty()).then(|| meta.session_slug.clone())
  3435                 });
  3436                 let session_summary = session_metadata.and_then(|meta| {
  3437                     (!meta.session_summary.is_empty()).then(|| meta.session_summary.clone())
  3438                 });
  3439                 ConversationSearchHit {
  3440                     rank: idx + 1,
  3441                     session_id,
  3442                     first_event_time: has_first_event_time.then_some(first_event_time),
  3443                     first_event_unix_ms: has_first_event_time.then_some(first_event_unix_ms),
  3444                     last_event_time: has_last_event_time.then_some(last_event_time),
  3445                     last_event_unix_ms: has_last_event_time.then_some(last_event_unix_ms),
  3446                     provider,
  3447                     session_slug,
  3448                     session_summary,
  3449                     score,
  3450                     matched_terms,
  3451                     event_count_considered,
  3452                     best_event_uid,
  3453                     snippet,
  3454                 }
  3455             })
  3456             .collect::<Vec<_>>();
  3457  
  3458         Ok(ConversationSearchResults {
  3459             query_id,
  3460             query: query_text.to_string(),
  3461             terms,
  3462             stats: ConversationSearchStats {
  3463                 docs,
  3464                 avgdl,
  3465                 took_ms: started.elapsed().as_millis() as u32,
  3466                 result_count: hits.len(),
  3467                 requested_limit,
  3468                 effective_limit: limit,
  3469                 limit_capped,
  3470             },
  3471             hits,
  3472         })
  3473     }
  3474 }
  3475  
  3476 fn token_re() -> &'static Regex {
  3477     static TOKEN_RE: OnceLock<Regex> = OnceLock::new();
  3478     TOKEN_RE.get_or_init(|| Regex::new(r"[A-Za-z0-9_]+").expect("valid token regex"))
  3479 }
  3480  
  3481 fn safe_value_re() -> &'static Regex {
  3482     static SAFE_RE: OnceLock<Regex> = OnceLock::new();
  3483     SAFE_RE
  3484         .get_or_init(|| Regex::new(r"^[A-Za-z0-9._:@/-]{1,256}$").expect("valid safe-value regex"))
  3485 }
  3486  
  3487 fn tokenize_query(text: &str, max_terms: usize) -> Vec<(String, u32)> {
  3488     let mut order = Vec::<String>::new();
  3489     let mut tf = HashMap::<String, u32>::new();
  3490  
  3491     for mat in token_re().find_iter(text) {
  3492         let token = mat.as_str().to_ascii_lowercase();
  3493         if token.len() < 2 || token.len() > 64 {
  3494             continue;
  3495         }
  3496  
  3497         if !tf.contains_key(&token) {
  3498             order.push(token.clone());
  3499         }
  3500         let entry = tf.entry(token).or_insert(0);
  3501         *entry += 1;
  3502  
  3503         if order.len() >= max_terms {
  3504             break;
  3505         }
  3506     }
  3507  
  3508     order
  3509         .into_iter()
  3510         .map(|token| {
  3511             let count = *tf.get(&token).unwrap_or(&1);
  3512             (token, count)
  3513         })
  3514         .collect()
  3515 }
  3516  
  3517 fn is_safe_filter_value(value: &str) -> bool {
  3518     safe_value_re().is_match(value)
  3519 }
  3520  
  3521 fn sql_quote(value: &str) -> String {
  3522     format!("'{}'", value.replace('\\', "\\\\").replace('\'', "''"))
  3523 }
  3524  
  3525 fn sql_identifier(value: &str) -> String {
  3526     format!("`{}`", value.replace('`', "``"))
  3527 }
  3528  
  3529 fn sql_array_strings(items: &[String]) -> String {
  3530     let parts = items.iter().map(|item| sql_quote(item)).collect::<Vec<_>>();
  3531     format!("[{}]", parts.join(","))
  3532 }
  3533  
  3534 fn sql_array_f64(items: &[f64]) -> String {
  3535     let parts = items
  3536         .iter()
  3537         .map(|v| format!("{:.12}", v))
  3538         .collect::<Vec<_>>();
  3539     format!("[{}]", parts.join(","))
  3540 }
  3541  
  3542 #[cfg(test)]
  3543 mod tests {
  3544     use super::*;
  3545  
  3546     fn sample_search_doc() -> SearchDocExtraCacheEntry {
  3547         SearchDocExtraCacheEntry {
  3548             session_id: "session-1".to_string(),
  3549             source_name: "source".to_string(),
  3550             provider: "provider".to_string(),
  3551             event_class: "message".to_string(),
  3552             payload_type: "message".to_string(),
  3553             actor_role: "assistant".to_string(),
  3554             name: "tool".to_string(),
  3555             phase: "".to_string(),
  3556             source_ref: "source-ref".to_string(),
  3557             doc_len: 42,
  3558             text_preview: "preview".to_string(),
  3559             has_codex_mcp: 0,
  3560             fetched_at: Instant::now(),
  3561         }
  3562     }
  3563  
  3564     fn sample_search_row(
  3565         event_uid: &str,
  3566         session_id: &str,
  3567         event_class: &str,
  3568         payload_type: &str,
  3569         actor_role: &str,
  3570         text_preview: &str,
  3571         score: f64,
  3572         matched_terms: u64,
  3573     ) -> SearchRow {
  3574         SearchRow {
  3575             event_uid: event_uid.to_string(),
  3576             session_id: session_id.to_string(),
  3577             source_name: "source".to_string(),
  3578             provider: "provider".to_string(),
  3579             event_class: event_class.to_string(),
  3580             payload_type: payload_type.to_string(),
  3581             actor_role: actor_role.to_string(),
  3582             name: String::new(),
  3583             phase: String::new(),
  3584             source_ref: "source-ref".to_string(),
  3585             doc_len: 42,
  3586             text_preview: text_preview.to_string(),
  3587             score,
  3588             matched_terms,
  3589         }
  3590     }
  3591  
  3592     #[test]
  3593     fn tokenize_query_enforces_limits_and_counts() {
  3594         let terms = tokenize_query("Hello hello world tool_use", 3);
  3595         assert_eq!(terms.len(), 3);
  3596         assert_eq!(terms[0], ("hello".to_string(), 2));
  3597         assert_eq!(terms[1].0, "world");
  3598     }
  3599  
  3600     #[test]
  3601     fn safe_filter_value_validation() {
  3602         assert!(is_safe_filter_value("session_123"));
  3603         assert!(is_safe_filter_value("a/b.c:d@e-1"));
  3604         assert!(!is_safe_filter_value("drop table;"));
  3605     }
  3606  
  3607     #[test]
  3608     fn sql_array_builders_escape_values() {
  3609         let values = vec!["a".to_string(), "b'c".to_string()];
  3610         let out = sql_array_strings(&values);
  3611         assert!(out.contains("'a'"));
  3612         assert!(out.contains("'b''c'"));
  3613     }
  3614  
  3615     #[test]
  3616     fn search_doc_filters_exclude_codex_by_flag() {
  3617         let mut row = sample_search_doc();
  3618         row.has_codex_mcp = 1;
  3619         assert!(
  3620             !ClickHouseConversationRepository::passes_search_doc_filters(
  3621                 &row, false, None, true, None
  3622             )
  3623         );
  3624     }
  3625  
  3626     #[test]
  3627     fn search_doc_filters_exclude_codex_by_tool_name() {
  3628         let mut row = sample_search_doc();
  3629         row.name = "search".to_string();
  3630         assert!(
  3631             !ClickHouseConversationRepository::passes_search_doc_filters(
  3632                 &row, false, None, true, None
  3633             )
  3634         );
  3635     }
  3636  
  3637     #[test]
  3638     fn search_doc_filters_event_kinds_override_include_tool_toggle() {
  3639         let mut row = sample_search_doc();
  3640         row.event_class = "tool_result".to_string();
  3641         row.payload_type = "tool_result".to_string();
  3642  
  3643         assert!(ClickHouseConversationRepository::passes_search_doc_filters(
  3644             &row,
  3645             false,
  3646             Some(&[SearchEventKind::ToolResult]),
  3647             false,
  3648             None
  3649         ));
  3650         assert!(
  3651             !ClickHouseConversationRepository::passes_search_doc_filters(
  3652                 &row,
  3653                 true,
  3654                 Some(&[SearchEventKind::Message]),
  3655                 false,
  3656                 None
  3657             )
  3658         );
  3659     }
  3660  
  3661     #[test]
  3662     fn search_doc_filters_map_event_msg_reasoning() {
  3663         let mut row = sample_search_doc();
  3664         row.event_class = "event_msg".to_string();
  3665         row.payload_type = "agent_reasoning".to_string();
  3666  
  3667         assert!(ClickHouseConversationRepository::passes_search_doc_filters(
  3668             &row,
  3669             true,
  3670             Some(&[SearchEventKind::Reasoning]),
  3671             false,
  3672             None
  3673         ));
  3674         assert!(
  3675             !ClickHouseConversationRepository::passes_search_doc_filters(
  3676                 &row,
  3677                 true,
  3678                 Some(&[SearchEventKind::Message]),
  3679                 false,
  3680                 None
  3681             )
  3682         );
  3683     }
  3684  
  3685     #[test]
  3686     fn normalize_event_kinds_rejects_empty_lists() {
  3687         let result = ClickHouseConversationRepository::normalize_event_kinds(Some(vec![]));
  3688         assert!(result.is_err());
  3689     }
  3690  
  3691     #[test]
  3692     fn normalize_event_kinds_sorts_and_deduplicates() {
  3693         let normalized = ClickHouseConversationRepository::normalize_event_kinds(Some(vec![
  3694             SearchEventKind::ToolResult,
  3695             SearchEventKind::Message,
  3696             SearchEventKind::ToolResult,
  3697         ]))
  3698         .expect("normalize should succeed")
  3699         .expect("normalized kinds should be present");
  3700  
  3701         assert_eq!(
  3702             normalized,
  3703             vec![SearchEventKind::Message, SearchEventKind::ToolResult]
  3704         );
  3705     }
  3706  
  3707     #[test]
  3708     fn dedupe_search_rows_prefers_message_over_event_msg_mirror() {
  3709         let rows = vec![
  3710             sample_search_row(
  3711                 "uid-event-msg",
  3712                 "sess-a",
  3713                 "event_msg",
  3714                 "agent_message",
  3715                 "assistant",
  3716                 "Short answer: no",
  3717                 18.26,
  3718                 3,
  3719             ),
  3720             sample_search_row(
  3721                 "uid-message",
  3722                 "sess-a",
  3723                 "message",
  3724                 "message",
  3725                 "assistant",
  3726                 "Short  answer:\nno",
  3727                 18.26,
  3728                 3,
  3729             ),
  3730         ];
  3731  
  3732         let deduped = ClickHouseConversationRepository::dedupe_search_rows(rows, 5);
  3733         assert_eq!(deduped.len(), 1);
  3734         assert_eq!(deduped[0].event_uid, "uid-message");
  3735         assert_eq!(deduped[0].event_class, "message");
  3736     }
  3737  
  3738     #[test]
  3739     fn dedupe_search_rows_fills_limit_after_collapsing_mirrors() {
  3740         let rows = vec![
  3741             sample_search_row(
  3742                 "uid-event-msg",
  3743                 "sess-a",
  3744                 "event_msg",
  3745                 "agent_message",
  3746                 "assistant",
  3747                 "same answer",
  3748                 18.26,
  3749                 3,
  3750             ),
  3751             sample_search_row(
  3752                 "uid-message",
  3753                 "sess-a",
  3754                 "message",
  3755                 "message",
  3756                 "assistant",
  3757                 "same answer",
  3758                 18.26,
  3759                 3,
  3760             ),
  3761             sample_search_row(
  3762                 "uid-2",
  3763                 "sess-b",
  3764                 "message",
  3765                 "message",
  3766                 "assistant",
  3767                 "different answer 2",
  3768                 17.00,
  3769                 2,
  3770             ),
  3771             sample_search_row(
  3772                 "uid-3",
  3773                 "sess-c",
  3774                 "message",
  3775                 "message",
  3776                 "assistant",
  3777                 "different answer 3",
  3778                 16.00,
  3779                 2,
  3780             ),
  3781         ];
  3782  
  3783         let deduped = ClickHouseConversationRepository::dedupe_search_rows(rows, 3);
  3784         assert_eq!(deduped.len(), 3);
  3785         assert_eq!(deduped[0].event_uid, "uid-message");
  3786         assert_eq!(deduped[1].event_uid, "uid-2");
  3787         assert_eq!(deduped[2].event_uid, "uid-3");
  3788     }
  3789  
  3790     #[test]
  3791     fn dedupe_search_rows_does_not_collapse_same_kind_hits() {
  3792         let rows = vec![
  3793             sample_search_row(
  3794                 "uid-1",
  3795                 "sess-a",
  3796                 "message",
  3797                 "message",
  3798                 "assistant",
  3799                 "same text",
  3800                 10.0,
  3801                 2,
  3802             ),
  3803             sample_search_row(
  3804                 "uid-2",
  3805                 "sess-a",
  3806                 "message",
  3807                 "message",
  3808                 "assistant",
  3809                 "same text",
  3810                 10.0,
  3811                 2,
  3812             ),
  3813         ];
  3814  
  3815         let deduped = ClickHouseConversationRepository::dedupe_search_rows(rows, 5);
  3816         assert_eq!(deduped.len(), 2);
  3817     }
  3818  
  3819     #[test]
  3820     fn dedupe_search_rows_prefers_reasoning_over_event_msg_reasoning_mirror() {
  3821         let rows = vec![
  3822             sample_search_row(
  3823                 "uid-event-msg-reasoning",
  3824                 "sess-a",
  3825                 "event_msg",
  3826                 "agent_reasoning",
  3827                 "assistant",
  3828                 "Let me think about this",
  3829                 12.50,
  3830                 2,
  3831             ),
  3832             sample_search_row(
  3833                 "uid-reasoning",
  3834                 "sess-a",
  3835                 "reasoning",
  3836                 "reasoning",
  3837                 "assistant",
  3838                 "Let me think about this",
  3839                 12.50,
  3840                 2,
  3841             ),
  3842         ];
  3843  
  3844         let deduped = ClickHouseConversationRepository::dedupe_search_rows(rows, 5);
  3845         assert_eq!(deduped.len(), 1);
  3846         assert_eq!(deduped[0].event_uid, "uid-reasoning");
  3847         assert_eq!(deduped[0].event_class, "reasoning");
  3848     }
  3849  
  3850     #[test]
  3851     fn dedupe_search_rows_reasoning_mirrors_do_not_collapse_with_messages() {
  3852         let rows = vec![
  3853             sample_search_row(
  3854                 "uid-reasoning",
  3855                 "sess-a",
  3856                 "reasoning",
  3857                 "reasoning",
  3858                 "assistant",
  3859                 "same text",
  3860                 10.0,
  3861                 2,
  3862             ),
  3863             sample_search_row(
  3864                 "uid-message",
  3865                 "sess-a",
  3866                 "message",
  3867                 "message",
  3868                 "assistant",
  3869                 "same text",
  3870                 10.0,
  3871                 2,
  3872             ),
  3873         ];
  3874  
  3875         let deduped = ClickHouseConversationRepository::dedupe_search_rows(rows, 5);
  3876         assert_eq!(deduped.len(), 2);
  3877     }
  3878  
  3879     #[test]
  3880     fn low_information_system_event_classifier_targets_open_noise() {
  3881         assert!(
  3882             ClickHouseConversationRepository::is_low_information_system_event("system", "progress")
  3883         );
  3884         assert!(
  3885             ClickHouseConversationRepository::is_low_information_system_event(
  3886                 "SYSTEM",
  3887                 "file_history_snapshot"
  3888             )
  3889         );
  3890         assert!(
  3891             ClickHouseConversationRepository::is_low_information_system_event("system", "system")
  3892         );
  3893         assert!(
  3894             !ClickHouseConversationRepository::is_low_information_system_event(
  3895                 "assistant",
  3896                 "progress"
  3897             )
  3898         );
  3899         assert!(
  3900             !ClickHouseConversationRepository::is_low_information_system_event(
  3901                 "system",
  3902                 "reasoning"
  3903             )
  3904         );
  3905     }
  3906  
  3907     #[test]
  3908     fn open_context_filter_clause_respects_include_system_events_flag() {
  3909         assert_eq!(
  3910             ClickHouseConversationRepository::open_context_filter_clause(true),
  3911             ""
  3912         );
  3913         let filtered_clause = ClickHouseConversationRepository::open_context_filter_clause(false);
  3914         assert!(filtered_clause.contains("progress"));
  3915         assert!(filtered_clause.contains("file_history_snapshot"));
  3916         assert!(filtered_clause.contains("lowerUTF8(actor_role) = 'system'"));
  3917     }
  3918 }