crates/moraine-mcp-core/src/lib.rs
1 use anyhow::{anyhow, Context, Result};
2 use moraine_clickhouse::ClickHouseClient;
3 use moraine_config::AppConfig;
4 use moraine_conversations::{
5 ClickHouseConversationRepository, ConversationListFilter, ConversationMode,
6 ConversationRepository, ConversationSearchQuery, OpenEventRequest, PageRequest, RepoConfig,
7 SearchEventKind, SearchEventsQuery,
8 };
9 use serde::Deserialize;
10 use serde_json::{json, Value};
11 use std::sync::{
12 atomic::{AtomicBool, Ordering},
13 Arc,
14 };
15 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16 use tracing::{debug, warn};
17
18 const TOOL_LIMIT_MIN: u16 = 1;
19
20 const CONVERSATION_MODE_CLASSIFICATION_SEMANTICS: &str =
21 "Sessions are classified into exactly one mode by first match on any event in the session: web_search > mcp_internal > tool_calling > chat.";
22
23 const SEARCH_CONVERSATIONS_MODE_DOC: &str =
24 "Optional `mode` filters by that computed session mode. Mode meanings: web_search=any web search activity (`web_search_call`, `search_results_received`, or `tool_use` with WebSearch/WebFetch); mcp_internal=any Codex MCP internal search/open activity (`source_name='codex-mcp'` or tool_name `search`/`open`) when web_search does not match; tool_calling=any tool activity (`tool_call`, `tool_result`, or `tool_use`) when neither higher mode matches; chat=none of the above.";
25
26 #[derive(Debug, Clone, Copy, Default, Deserialize)]
27 #[serde(rename_all = "lowercase")]
28 enum Verbosity {
29 #[default]
30 Prose,
31 Full,
32 }
33
34 #[derive(Debug, Deserialize)]
35 struct RpcRequest {
36 #[serde(default)]
37 id: Option<Value>,
38 method: String,
39 #[serde(default)]
40 params: Value,
41 }
42
43 #[derive(Debug, Deserialize)]
44 struct ToolCallParams {
45 name: String,
46 #[serde(default)]
47 arguments: Value,
48 }
49
50 #[derive(Debug, Deserialize)]
51 struct SearchArgs {
52 query: String,
53 #[serde(default)]
54 limit: Option<u16>,
55 #[serde(default)]
56 session_id: Option<String>,
57 #[serde(default)]
58 min_score: Option<f64>,
59 #[serde(default)]
60 min_should_match: Option<u16>,
61 #[serde(default)]
62 include_tool_events: Option<bool>,
63 #[serde(default, alias = "event_kinds", alias = "kind", alias = "kinds")]
64 event_kind: Option<SearchEventKindsArg>,
65 #[serde(default)]
66 exclude_codex_mcp: Option<bool>,
67 #[serde(default)]
68 verbosity: Option<Verbosity>,
69 }
70
71 #[derive(Debug, Clone, Deserialize)]
72 #[serde(untagged)]
73 enum SearchEventKindsArg {
74 One(SearchEventKind),
75 Many(Vec<SearchEventKind>),
76 }
77
78 impl SearchEventKindsArg {
79 fn into_vec(self) -> Vec<SearchEventKind> {
80 match self {
81 Self::One(kind) => vec![kind],
82 Self::Many(kinds) => kinds,
83 }
84 }
85 }
86
87 #[derive(Debug, Deserialize)]
88 struct SearchConversationsArgs {
89 query: String,
90 #[serde(default)]
91 limit: Option<u16>,
92 #[serde(default)]
93 min_score: Option<f64>,
94 #[serde(default)]
95 min_should_match: Option<u16>,
96 #[serde(default)]
97 from_unix_ms: Option<i64>,
98 #[serde(default)]
99 to_unix_ms: Option<i64>,
100 #[serde(default)]
101 mode: Option<ConversationMode>,
102 #[serde(default)]
103 include_tool_events: Option<bool>,
104 #[serde(default)]
105 exclude_codex_mcp: Option<bool>,
106 #[serde(default)]
107 verbosity: Option<Verbosity>,
108 }
109
110 #[derive(Debug, Default, Deserialize)]
111 struct ListSessionsArgs {
112 #[serde(default)]
113 limit: Option<u16>,
114 #[serde(default)]
115 cursor: Option<String>,
116 #[serde(default)]
117 from_unix_ms: Option<i64>,
118 #[serde(default)]
119 to_unix_ms: Option<i64>,
120 #[serde(default)]
121 mode: Option<ConversationMode>,
122 #[serde(default)]
123 verbosity: Option<Verbosity>,
124 }
125
126 #[derive(Debug, Deserialize)]
127 struct OpenArgs {
128 event_uid: String,
129 #[serde(default)]
130 before: Option<u16>,
131 #[serde(default)]
132 after: Option<u16>,
133 #[serde(default)]
134 include_system_events: Option<bool>,
135 #[serde(default)]
136 verbosity: Option<Verbosity>,
137 }
138
139 #[derive(Debug, Default, Deserialize)]
140 struct SearchProsePayload {
141 #[serde(default)]
142 query_id: String,
143 #[serde(default)]
144 query: String,
145 #[serde(default)]
146 stats: SearchProseStats,
147 #[serde(default)]
148 hits: Vec<SearchProseHit>,
149 }
150
151 #[derive(Debug, Default, Deserialize)]
152 struct SearchProseStats {
153 #[serde(default)]
154 took_ms: u64,
155 #[serde(default)]
156 result_count: u64,
157 #[serde(default)]
158 requested_limit: Option<u16>,
159 #[serde(default)]
160 effective_limit: Option<u16>,
161 #[serde(default)]
162 limit_capped: bool,
163 }
164
165 #[derive(Debug, Default, Deserialize)]
166 struct SearchProseHit {
167 #[serde(default)]
168 rank: u64,
169 #[serde(default)]
170 event_uid: String,
171 #[serde(default)]
172 session_id: String,
173 #[serde(default)]
174 first_event_time: String,
175 #[serde(default)]
176 last_event_time: String,
177 #[serde(default)]
178 score: f64,
179 #[serde(default)]
180 event_class: String,
181 #[serde(default)]
182 payload_type: String,
183 #[serde(default)]
184 actor_role: String,
185 #[serde(default)]
186 text_preview: String,
187 }
188
189 #[derive(Debug, Default, Deserialize)]
190 struct OpenProsePayload {
191 #[serde(default)]
192 found: bool,
193 #[serde(default)]
194 event_uid: String,
195 #[serde(default)]
196 session_id: String,
197 #[serde(default)]
198 turn_seq: u32,
199 #[serde(default)]
200 target_event_order: u64,
201 #[serde(default)]
202 before: u16,
203 #[serde(default)]
204 after: u16,
205 #[serde(default)]
206 events: Vec<OpenProseEvent>,
207 }
208
209 #[derive(Debug, Default, Deserialize)]
210 struct OpenProseEvent {
211 #[serde(default)]
212 is_target: bool,
213 #[serde(default)]
214 event_order: u64,
215 #[serde(default)]
216 actor_role: String,
217 #[serde(default)]
218 event_class: String,
219 #[serde(default)]
220 payload_type: String,
221 #[serde(default)]
222 text_content: String,
223 }
224
225 #[derive(Debug, Default, Deserialize)]
226 struct ConversationSearchProsePayload {
227 #[serde(default)]
228 query_id: String,
229 #[serde(default)]
230 query: String,
231 #[serde(default)]
232 stats: ConversationSearchProseStats,
233 #[serde(default)]
234 hits: Vec<ConversationSearchProseHit>,
235 }
236
237 #[derive(Debug, Default, Deserialize)]
238 struct ConversationSearchProseStats {
239 #[serde(default)]
240 took_ms: u64,
241 #[serde(default)]
242 result_count: u64,
243 #[serde(default)]
244 requested_limit: Option<u16>,
245 #[serde(default)]
246 effective_limit: Option<u16>,
247 #[serde(default)]
248 limit_capped: bool,
249 }
250
251 #[derive(Debug, Default, Deserialize)]
252 struct ConversationSearchProseHit {
253 #[serde(default)]
254 rank: u64,
255 #[serde(default)]
256 session_id: String,
257 #[serde(default)]
258 first_event_time: Option<String>,
259 #[serde(default)]
260 first_event_unix_ms: Option<i64>,
261 #[serde(default)]
262 last_event_time: Option<String>,
263 #[serde(default)]
264 last_event_unix_ms: Option<i64>,
265 #[serde(default)]
266 provider: Option<String>,
267 #[serde(default)]
268 session_slug: Option<String>,
269 #[serde(default)]
270 session_summary: Option<String>,
271 #[serde(default)]
272 score: f64,
273 #[serde(default)]
274 matched_terms: u16,
275 #[serde(default)]
276 event_count_considered: u32,
277 #[serde(default)]
278 best_event_uid: Option<String>,
279 #[serde(default)]
280 snippet: Option<String>,
281 }
282
283 #[derive(Debug, Default, Deserialize)]
284 struct SessionListProsePayload {
285 #[serde(default)]
286 sessions: Vec<SessionListProseSession>,
287 #[serde(default)]
288 next_cursor: Option<String>,
289 }
290
291 #[derive(Debug, Default, Deserialize)]
292 struct SessionListProseSession {
293 #[serde(default)]
294 session_id: String,
295 #[serde(default)]
296 start_time: String,
297 #[serde(default)]
298 start_unix_ms: i64,
299 #[serde(default)]
300 end_time: String,
301 #[serde(default)]
302 end_unix_ms: i64,
303 #[serde(default)]
304 event_count: u64,
305 #[serde(default)]
306 mode: String,
307 }
308
309 #[derive(Clone)]
310 struct AppState {
311 cfg: AppConfig,
312 repo: ClickHouseConversationRepository,
313 prewarm_started: Arc<AtomicBool>,
314 }
315
316 impl AppState {
317 async fn handle_request(&self, req: RpcRequest) -> Option<Value> {
318 let id = req.id.clone();
319
320 match req.method.as_str() {
321 "initialize" => {
322 if self
323 .prewarm_started
324 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
325 .is_ok()
326 {
327 if let Err(err) = self.repo.prewarm_mcp_search_state_quick().await {
328 warn!("mcp quick prewarm failed: {}", err);
329 } else {
330 debug!("mcp quick prewarm completed");
331 }
332
333 let repo = self.repo.clone();
334 tokio::spawn(async move {
335 if let Err(err) = repo.prewarm_mcp_search_state().await {
336 warn!("mcp prewarm failed: {}", err);
337 } else {
338 debug!("mcp prewarm completed");
339 }
340 });
341 }
342
343 let result = json!({
344 "protocolVersion": self.cfg.mcp.protocol_version,
345 "capabilities": {
346 "tools": {
347 "listChanged": false
348 }
349 },
350 "serverInfo": {
351 "name": "codex-mcp",
352 "version": env!("CARGO_PKG_VERSION")
353 }
354 });
355
356 id.map(|msg_id| rpc_ok(msg_id, result))
357 }
358 "ping" => id.map(|msg_id| rpc_ok(msg_id, json!({}))),
359 "notifications/initialized" | "initialized" => None,
360 "tools/list" => id.map(|msg_id| rpc_ok(msg_id, self.tools_list_result())),
361 "tools/call" => {
362 let msg_id = id?;
363
364 let parsed: Result<ToolCallParams> =
365 serde_json::from_value(req.params).context("invalid tools/call params payload");
366
367 match parsed {
368 Ok(params) => {
369 let tool_result = match self.call_tool(params).await {
370 Ok(value) => value,
371 Err(err) => tool_error_result(err.to_string()),
372 };
373 Some(rpc_ok(msg_id, tool_result))
374 }
375 Err(err) => Some(rpc_err(msg_id, -32602, &format!("invalid params: {err}"))),
376 }
377 }
378 _ => id.map(|msg_id| {
379 rpc_err(msg_id, -32601, &format!("method not found: {}", req.method))
380 }),
381 }
382 }
383
384 fn tools_list_result(&self) -> Value {
385 let (limit_min, limit_max) = tool_limit_bounds(self.cfg.mcp.max_results);
386 json!({
387 "tools": [
388 {
389 "name": "search",
390 "description": "BM25 lexical search over Moraine indexed conversation events. Bag-of-words ranking: no phrase matching, no stemming. Word order does not matter.",
391 "inputSchema": {
392 "type": "object",
393 "properties": {
394 "query": { "type": "string" },
395 "limit": { "type": "integer", "minimum": limit_min, "maximum": limit_max },
396 "session_id": { "type": "string" },
397 "min_score": { "type": "number" },
398 "min_should_match": { "type": "integer", "minimum": 1, "description": "Minimum number of query terms that must match. Values exceeding the number of query terms are clamped." },
399 "include_tool_events": { "type": "boolean" },
400 "event_kind": {
401 "oneOf": [
402 {
403 "type": "string",
404 "enum": ["message", "reasoning", "tool_call", "tool_result"]
405 },
406 {
407 "type": "array",
408 "items": {
409 "type": "string",
410 "enum": ["message", "reasoning", "tool_call", "tool_result"]
411 }
412 }
413 ]
414 },
415 "exclude_codex_mcp": { "type": "boolean" },
416 "verbosity": {
417 "type": "string",
418 "enum": ["prose", "full"],
419 "default": "prose"
420 }
421 },
422 "required": ["query"]
423 }
424 },
425 {
426 "name": "open",
427 "description": "Open one event by uid with surrounding conversation context.",
428 "inputSchema": {
429 "type": "object",
430 "properties": {
431 "event_uid": { "type": "string" },
432 "before": { "type": "integer", "minimum": 0 },
433 "after": { "type": "integer", "minimum": 0 },
434 "include_system_events": { "type": "boolean", "default": false },
435 "verbosity": {
436 "type": "string",
437 "enum": ["prose", "full"],
438 "default": "prose"
439 }
440 },
441 "required": ["event_uid"]
442 }
443 },
444 {
445 "name": "search_conversations",
446 "description": format!(
447 "BM25 lexical search across whole conversations. {CONVERSATION_MODE_CLASSIFICATION_SEMANTICS} {SEARCH_CONVERSATIONS_MODE_DOC}"
448 ),
449 "inputSchema": {
450 "type": "object",
451 "properties": {
452 "query": { "type": "string" },
453 "limit": { "type": "integer", "minimum": limit_min, "maximum": limit_max },
454 "min_score": { "type": "number" },
455 "min_should_match": { "type": "integer", "minimum": 1, "description": "Minimum number of query terms that must match. Values exceeding the number of query terms are clamped." },
456 "from_unix_ms": { "type": "integer" },
457 "to_unix_ms": { "type": "integer" },
458 "mode": {
459 "type": "string",
460 "enum": ["web_search", "mcp_internal", "tool_calling", "chat"],
461 "description": SEARCH_CONVERSATIONS_MODE_DOC
462 },
463 "include_tool_events": { "type": "boolean" },
464 "exclude_codex_mcp": { "type": "boolean" },
465 "verbosity": {
466 "type": "string",
467 "enum": ["prose", "full"],
468 "default": "prose"
469 }
470 },
471 "required": ["query"]
472 }
473 },
474 {
475 "name": "list_sessions",
476 "description": "List session metadata in a time window without requiring a search query.",
477 "inputSchema": {
478 "type": "object",
479 "properties": {
480 "limit": { "type": "integer", "minimum": 1, "maximum": self.cfg.mcp.max_results },
481 "cursor": { "type": "string" },
482 "from_unix_ms": { "type": "integer" },
483 "to_unix_ms": { "type": "integer" },
484 "mode": {
485 "type": "string",
486 "enum": ["web_search", "mcp_internal", "tool_calling", "chat"],
487 "description": SEARCH_CONVERSATIONS_MODE_DOC
488 },
489 "verbosity": {
490 "type": "string",
491 "enum": ["prose", "full"],
492 "default": "prose"
493 }
494 }
495 }
496 }
497 ]
498 })
499 }
500
501 async fn call_tool(&self, params: ToolCallParams) -> Result<Value> {
502 match params.name.as_str() {
503 "search" => {
504 let mut args: SearchArgs = serde_json::from_value(params.arguments)
505 .context("search expects a JSON object with at least {\"query\": ...}")?;
506 args.limit = validate_tool_limit("search", args.limit, self.cfg.mcp.max_results)?;
507 let verbosity = args.verbosity.unwrap_or_default();
508 let payload = self.search(args).await?;
509 match verbosity {
510 Verbosity::Full => Ok(tool_ok_full(payload)),
511 Verbosity::Prose => Ok(tool_ok_prose(format_search_prose(&payload)?)),
512 }
513 }
514 "open" => {
515 let args: OpenArgs = serde_json::from_value(params.arguments)
516 .context("open expects {\"event_uid\": ...}")?;
517 let verbosity = args.verbosity.unwrap_or_default();
518 let payload = self.open(args).await?;
519 match verbosity {
520 Verbosity::Full => Ok(tool_ok_full(payload)),
521 Verbosity::Prose => Ok(tool_ok_prose(format_open_prose(&payload)?)),
522 }
523 }
524 "search_conversations" => {
525 let mut args: SearchConversationsArgs = serde_json::from_value(params.arguments)
526 .context(
527 "search_conversations expects a JSON object with at least {\"query\": ...}",
528 )?;
529 args.limit = validate_tool_limit(
530 "search_conversations",
531 args.limit,
532 self.cfg.mcp.max_results,
533 )?;
534 let verbosity = args.verbosity.unwrap_or_default();
535 let mode = args.mode;
536 let payload = self.search_conversations(args).await?;
537 match verbosity {
538 Verbosity::Full => Ok(tool_ok_full(payload)),
539 Verbosity::Prose => Ok(tool_ok_prose(format_conversation_search_prose(
540 &payload, mode,
541 )?)),
542 }
543 }
544 "list_sessions" => {
545 let args: ListSessionsArgs = if params.arguments.is_null() {
546 ListSessionsArgs::default()
547 } else {
548 serde_json::from_value(params.arguments)
549 .context("list_sessions expects a JSON object with optional filters")?
550 };
551 let verbosity = args.verbosity.unwrap_or_default();
552 let payload = self.list_sessions(args).await?;
553 match verbosity {
554 Verbosity::Full => Ok(tool_ok_full(payload)),
555 Verbosity::Prose => Ok(tool_ok_prose(format_session_list_prose(&payload)?)),
556 }
557 }
558 other => Err(anyhow!("unknown tool: {other}")),
559 }
560 }
561
562 async fn search(&self, args: SearchArgs) -> Result<Value> {
563 let result = self
564 .repo
565 .search_events(SearchEventsQuery {
566 query: args.query,
567 source: Some("moraine-mcp".to_string()),
568 limit: args.limit,
569 session_id: args.session_id,
570 min_score: args.min_score,
571 min_should_match: args.min_should_match,
572 include_tool_events: args.include_tool_events,
573 event_kinds: args.event_kind.map(SearchEventKindsArg::into_vec),
574 exclude_codex_mcp: args.exclude_codex_mcp,
575 disable_cache: None,
576 search_strategy: None,
577 })
578 .await
579 .map_err(|err| anyhow!(err.to_string()))?;
580
581 serde_json::to_value(result).context("failed to encode search result payload")
582 }
583
584 async fn open(&self, args: OpenArgs) -> Result<Value> {
585 let result = self
586 .repo
587 .open_event(OpenEventRequest {
588 event_uid: args.event_uid,
589 before: args.before,
590 after: args.after,
591 include_system_events: args.include_system_events,
592 })
593 .await
594 .map_err(|err| anyhow!(err.to_string()))?;
595
596 if !result.found {
597 return Ok(json!({
598 "found": false,
599 "event_uid": result.event_uid,
600 "events": [],
601 }));
602 }
603
604 serde_json::to_value(result).context("failed to encode open result payload")
605 }
606
607 async fn search_conversations(&self, args: SearchConversationsArgs) -> Result<Value> {
608 let result = self
609 .repo
610 .search_conversations(ConversationSearchQuery {
611 query: args.query,
612 limit: args.limit,
613 min_score: args.min_score,
614 min_should_match: args.min_should_match,
615 from_unix_ms: args.from_unix_ms,
616 to_unix_ms: args.to_unix_ms,
617 mode: args.mode,
618 include_tool_events: args.include_tool_events,
619 exclude_codex_mcp: args.exclude_codex_mcp,
620 })
621 .await
622 .map_err(|err| anyhow!(err.to_string()))?;
623
624 serde_json::to_value(result).context("failed to encode search_conversations result payload")
625 }
626
627 async fn list_sessions(&self, args: ListSessionsArgs) -> Result<Value> {
628 let ListSessionsArgs {
629 limit,
630 cursor,
631 from_unix_ms,
632 to_unix_ms,
633 mode,
634 verbosity: _,
635 } = args;
636
637 let page = self
638 .repo
639 .list_conversations(
640 ConversationListFilter {
641 from_unix_ms,
642 to_unix_ms,
643 mode,
644 },
645 PageRequest {
646 limit: limit.unwrap_or(self.cfg.mcp.max_results),
647 cursor,
648 },
649 )
650 .await
651 .map_err(|err| anyhow!(err.to_string()))?;
652
653 let sessions = page
654 .items
655 .into_iter()
656 .map(|summary| {
657 json!({
658 "session_id": summary.session_id,
659 "start_time": summary.first_event_time,
660 "start_unix_ms": summary.first_event_unix_ms,
661 "end_time": summary.last_event_time,
662 "end_unix_ms": summary.last_event_unix_ms,
663 "event_count": summary.total_events,
664 "turn_count": summary.total_turns,
665 "user_messages": summary.user_messages,
666 "assistant_messages": summary.assistant_messages,
667 "tool_calls": summary.tool_calls,
668 "tool_results": summary.tool_results,
669 "mode": summary.mode.as_str(),
670 })
671 })
672 .collect::<Vec<_>>();
673
674 Ok(json!({
675 "from_unix_ms": from_unix_ms,
676 "to_unix_ms": to_unix_ms,
677 "mode": mode.map(ConversationMode::as_str),
678 "sessions": sessions,
679 "next_cursor": page.next_cursor,
680 }))
681 }
682 }
683
684 fn tool_limit_bounds(max_results: u16) -> (u16, u16) {
685 (TOOL_LIMIT_MIN, max_results.max(TOOL_LIMIT_MIN))
686 }
687
688 fn validate_tool_limit(
689 tool_name: &str,
690 limit: Option<u16>,
691 max_results: u16,
692 ) -> Result<Option<u16>> {
693 let (min, max) = tool_limit_bounds(max_results);
694 match limit {
695 Some(value) if !(min..=max).contains(&value) => Err(anyhow!(
696 "{tool_name} limit must be between {min} and {max} (received {value})"
697 )),
698 _ => Ok(limit),
699 }
700 }
701
702 fn rpc_ok(id: Value, result: Value) -> Value {
703 json!({
704 "jsonrpc": "2.0",
705 "id": id,
706 "result": result
707 })
708 }
709
710 fn rpc_err(id: Value, code: i64, message: &str) -> Value {
711 json!({
712 "jsonrpc": "2.0",
713 "id": id,
714 "error": {
715 "code": code,
716 "message": message
717 }
718 })
719 }
720
721 fn tool_ok_full(payload: Value) -> Value {
722 let text = serde_json::to_string_pretty(&payload).unwrap_or_else(|_| "{}".to_string());
723 json!({
724 "content": [
725 {
726 "type": "text",
727 "text": text
728 }
729 ],
730 "structuredContent": payload,
731 "isError": false
732 })
733 }
734
735 fn tool_ok_prose(text: String) -> Value {
736 json!({
737 "content": [
738 {
739 "type": "text",
740 "text": text
741 }
742 ],
743 "isError": false
744 })
745 }
746
747 fn tool_error_result(message: String) -> Value {
748 json!({
749 "content": [
750 {
751 "type": "text",
752 "text": message
753 }
754 ],
755 "isError": true
756 })
757 }
758
759 fn format_search_prose(payload: &Value) -> Result<String> {
760 let parsed: SearchProsePayload =
761 serde_json::from_value(payload.clone()).context("failed to parse search payload")?;
762
763 let mut out = String::new();
764 out.push_str(&format!("Search: \"{}\"\n", parsed.query));
765 out.push_str(&format!("Query ID: {}\n", parsed.query_id));
766 out.push_str(&format!(
767 "Hits: {} ({} ms)\n",
768 parsed.stats.result_count, parsed.stats.took_ms
769 ));
770 if let Some(limit_summary) = format_limit_summary(
771 parsed.stats.requested_limit,
772 parsed.stats.effective_limit,
773 parsed.stats.limit_capped,
774 ) {
775 out.push_str(&format!("Limit: {limit_summary}\n"));
776 }
777
778 if parsed.hits.is_empty() {
779 out.push_str("\nNo hits.");
780 return Ok(out);
781 }
782
783 for hit in &parsed.hits {
784 let kind = display_kind(&hit.event_class, &hit.payload_type);
785 let recency = if hit.last_event_time.is_empty() {
786 String::new()
787 } else {
788 format!(" last_event_time={}", hit.last_event_time)
789 };
790 out.push_str(&format!(
791 "\n{}) session={} score={:.4} kind={} role={}{}\n",
792 hit.rank, hit.session_id, hit.score, kind, hit.actor_role, recency
793 ));
794 if !hit.first_event_time.is_empty() && !hit.last_event_time.is_empty() {
795 out.push_str(&format!(
796 " session_window: {} -> {}\n",
797 hit.first_event_time, hit.last_event_time
798 ));
799 }
800
801 let snippet = compact_text_line(&hit.text_preview, 220);
802 if !snippet.is_empty() {
803 out.push_str(&format!(" snippet: {}\n", snippet));
804 }
805
806 out.push_str(&format!(" event_uid: {}\n", hit.event_uid));
807 out.push_str(&format!(" next: open(event_uid=\"{}\")\n", hit.event_uid));
808 }
809
810 Ok(out.trim_end().to_string())
811 }
812
813 fn format_open_prose(payload: &Value) -> Result<String> {
814 let mut parsed: OpenProsePayload =
815 serde_json::from_value(payload.clone()).context("failed to parse open payload")?;
816
817 let mut out = String::new();
818 out.push_str(&format!("Open event: {}\n", parsed.event_uid));
819
820 if !parsed.found {
821 out.push_str("Not found.");
822 return Ok(out);
823 }
824
825 out.push_str(&format!("Session: {}\n", parsed.session_id));
826 out.push_str(&format!("Turn: {}\n", parsed.turn_seq));
827 out.push_str(&format!(
828 "Context window: before={} after={}\n",
829 parsed.before, parsed.after
830 ));
831
832 parsed.events.sort_by_key(|e| e.event_order);
833
834 let mut before_events = Vec::new();
835 let mut target_events = Vec::new();
836 let mut after_events = Vec::new();
837
838 for event in parsed.events {
839 if event.is_target || event.event_order == parsed.target_event_order {
840 target_events.push(event);
841 } else if event.event_order < parsed.target_event_order {
842 before_events.push(event);
843 } else {
844 after_events.push(event);
845 }
846 }
847
848 out.push_str("\nBefore:\n");
849 if before_events.is_empty() {
850 out.push_str("- (none)\n");
851 } else {
852 for event in &before_events {
853 append_open_event_line(&mut out, event);
854 }
855 }
856
857 out.push_str("\nTarget:\n");
858 if target_events.is_empty() {
859 out.push_str("- (none)\n");
860 } else {
861 for event in &target_events {
862 append_open_event_line(&mut out, event);
863 }
864 }
865
866 out.push_str("\nAfter:\n");
867 if after_events.is_empty() {
868 out.push_str("- (none)");
869 } else {
870 for event in &after_events {
871 append_open_event_line(&mut out, event);
872 }
873 }
874
875 Ok(out.trim_end().to_string())
876 }
877
878 fn mode_meaning(mode: ConversationMode) -> &'static str {
879 match mode {
880 ConversationMode::WebSearch => {
881 "any web search activity (`web_search_call`, `search_results_received`, or `tool_use` with WebSearch/WebFetch)"
882 }
883 ConversationMode::McpInternal => {
884 "any Codex MCP internal search/open activity (`source_name='codex-mcp'` or tool_name `search`/`open`) when web_search does not match"
885 }
886 ConversationMode::ToolCalling => {
887 "any tool activity (`tool_call`, `tool_result`, or `tool_use`) when neither higher mode matches"
888 }
889 ConversationMode::Chat => {
890 "no detected web-search, mcp-internal, or tool-calling activity"
891 }
892 }
893 }
894
895 fn format_conversation_search_prose(
896 payload: &Value,
897 mode: Option<ConversationMode>,
898 ) -> Result<String> {
899 let parsed: ConversationSearchProsePayload = serde_json::from_value(payload.clone())
900 .context("failed to parse search_conversations payload")?;
901
902 let mut out = String::new();
903 out.push_str(&format!("Conversation Search: \"{}\"\n", parsed.query));
904 out.push_str(&format!("Query ID: {}\n", parsed.query_id));
905 out.push_str(&format!(
906 "Hits: {} ({} ms)\n",
907 parsed.stats.result_count, parsed.stats.took_ms
908 ));
909 if let Some(limit_summary) = format_limit_summary(
910 parsed.stats.requested_limit,
911 parsed.stats.effective_limit,
912 parsed.stats.limit_capped,
913 ) {
914 out.push_str(&format!("Limit: {limit_summary}\n"));
915 }
916
917 if let Some(mode) = mode {
918 out.push_str(&format!("Mode filter: {}\n", mode.as_str()));
919 out.push_str(&format!(
920 "Mode semantics: {}\n",
921 CONVERSATION_MODE_CLASSIFICATION_SEMANTICS
922 ));
923 out.push_str(&format!("Mode meaning: {}\n", mode_meaning(mode)));
924 }
925
926 if parsed.hits.is_empty() {
927 out.push_str("\nNo hits.");
928 return Ok(out);
929 }
930
931 for hit in &parsed.hits {
932 out.push_str(&format!(
933 "\n{}) session={} score={:.4} matched_terms={} events={}\n",
934 hit.rank, hit.session_id, hit.score, hit.matched_terms, hit.event_count_considered
935 ));
936 if let Some(provider) = hit.provider.as_deref() {
937 out.push_str(&format!(" provider: {}\n", provider));
938 }
939 if let (Some(first), Some(last)) = (
940 hit.first_event_time.as_deref(),
941 hit.last_event_time.as_deref(),
942 ) {
943 out.push_str(&format!(" first_last: {} -> {}\n", first, last));
944 } else if let (Some(first_ms), Some(last_ms)) =
945 (hit.first_event_unix_ms, hit.last_event_unix_ms)
946 {
947 out.push_str(&format!(
948 " first_last_unix_ms: {} -> {}\n",
949 first_ms, last_ms
950 ));
951 }
952 if let Some(session_slug) = hit.session_slug.as_deref() {
953 out.push_str(&format!(" session_slug: {}\n", session_slug));
954 }
955 if let Some(session_summary) = hit.session_summary.as_deref() {
956 let compact = compact_text_line(session_summary, 220);
957 if !compact.is_empty() {
958 out.push_str(&format!(" session_summary: {}\n", compact));
959 }
960 }
961
962 if let Some(best_event_uid) = hit.best_event_uid.as_deref() {
963 out.push_str(&format!(" best_event_uid: {}\n", best_event_uid));
964 out.push_str(&format!(
965 " next: open(event_uid=\"{}\")\n",
966 best_event_uid
967 ));
968 }
969
970 if let Some(snippet) = hit.snippet.as_deref() {
971 let compact = compact_text_line(snippet, 220);
972 if !compact.is_empty() {
973 out.push_str(&format!(" snippet: {}\n", compact));
974 }
975 }
976 }
977
978 Ok(out.trim_end().to_string())
979 }
980
981 fn format_session_list_prose(payload: &Value) -> Result<String> {
982 let parsed: SessionListProsePayload =
983 serde_json::from_value(payload.clone()).context("failed to parse list_sessions payload")?;
984
985 let mut out = String::new();
986 out.push_str("Session List\n");
987 out.push_str(&format!("Sessions: {}\n", parsed.sessions.len()));
988
989 if parsed.sessions.is_empty() {
990 out.push_str("\nNo sessions.");
991 return Ok(out);
992 }
993
994 for (idx, session) in parsed.sessions.iter().enumerate() {
995 let mode = if session.mode.is_empty() {
996 "chat"
997 } else {
998 session.mode.as_str()
999 };
1000
1001 out.push_str(&format!(
1002 "\n{}) session={} mode={} events={}\n",
1003 idx + 1,
1004 session.session_id,
1005 mode,
1006 session.event_count
1007 ));
1008 out.push_str(&format!(
1009 " start: {} (unix_ms={})\n",
1010 session.start_time, session.start_unix_ms
1011 ));
1012 out.push_str(&format!(
1013 " end: {} (unix_ms={})\n",
1014 session.end_time, session.end_unix_ms
1015 ));
1016 }
1017
1018 if let Some(cursor) = parsed.next_cursor.as_deref() {
1019 out.push_str(&format!("\nnext_cursor: {}", cursor));
1020 }
1021
1022 Ok(out.trim_end().to_string())
1023 }
1024
1025 fn append_open_event_line(out: &mut String, event: &OpenProseEvent) {
1026 let kind = display_kind(&event.event_class, &event.payload_type);
1027 out.push_str(&format!(
1028 "- [{}] {} {}\n",
1029 event.event_order, event.actor_role, kind
1030 ));
1031
1032 let text = compact_text_line(&event.text_content, 220);
1033 if !text.is_empty() {
1034 out.push_str(&format!(" {}\n", text));
1035 }
1036 }
1037
1038 fn format_limit_summary(
1039 requested_limit: Option<u16>,
1040 effective_limit: Option<u16>,
1041 limit_capped: bool,
1042 ) -> Option<String> {
1043 let effective = effective_limit?;
1044 match requested_limit {
1045 Some(requested) if limit_capped => Some(format!(
1046 "effective={} (capped at max_results={}; requested={})",
1047 effective, effective, requested
1048 )),
1049 Some(requested) => Some(format!("effective={} (requested={})", effective, requested)),
1050 None => Some(format!("effective={effective}")),
1051 }
1052 }
1053
1054 fn display_kind(event_class: &str, payload_type: &str) -> String {
1055 if payload_type.is_empty() || payload_type == event_class || payload_type == "unknown" {
1056 if event_class.is_empty() {
1057 "event".to_string()
1058 } else {
1059 event_class.to_string()
1060 }
1061 } else if event_class.is_empty() {
1062 payload_type.to_string()
1063 } else {
1064 format!("{} ({})", event_class, payload_type)
1065 }
1066 }
1067
1068 fn compact_text_line(text: &str, max_chars: usize) -> String {
1069 let compact = text.split_whitespace().collect::<Vec<_>>().join(" ");
1070 if compact.chars().count() <= max_chars {
1071 return compact;
1072 }
1073
1074 let mut trimmed: String = compact.chars().take(max_chars.saturating_sub(3)).collect();
1075 trimmed.push_str("...");
1076 trimmed
1077 }
1078
1079 pub async fn run_stdio(cfg: AppConfig) -> Result<()> {
1080 let ch = ClickHouseClient::new(cfg.clickhouse.clone())?;
1081
1082 let repo_cfg = RepoConfig {
1083 max_results: cfg.mcp.max_results,
1084 preview_chars: cfg.mcp.preview_chars,
1085 default_context_before: cfg.mcp.default_context_before,
1086 default_context_after: cfg.mcp.default_context_after,
1087 default_include_tool_events: cfg.mcp.default_include_tool_events,
1088 default_exclude_codex_mcp: cfg.mcp.default_exclude_codex_mcp,
1089 async_log_writes: cfg.mcp.async_log_writes,
1090 bm25_k1: cfg.bm25.k1,
1091 bm25_b: cfg.bm25.b,
1092 bm25_default_min_score: cfg.bm25.default_min_score,
1093 bm25_default_min_should_match: cfg.bm25.default_min_should_match,
1094 bm25_max_query_terms: cfg.bm25.max_query_terms,
1095 };
1096
1097 let repo = ClickHouseConversationRepository::new(ch, repo_cfg);
1098 let state = Arc::new(AppState {
1099 cfg,
1100 repo,
1101 prewarm_started: Arc::new(AtomicBool::new(false)),
1102 });
1103
1104 let stdin = BufReader::new(tokio::io::stdin());
1105 let mut lines = stdin.lines();
1106 let mut stdout = tokio::io::stdout();
1107
1108 while let Some(line) = lines.next_line().await? {
1109 let line = line.trim();
1110 if line.is_empty() {
1111 continue;
1112 }
1113
1114 debug!("incoming rpc line: {}", line);
1115
1116 let parsed = serde_json::from_str::<RpcRequest>(line);
1117 let req = match parsed {
1118 Ok(req) => req,
1119 Err(err) => {
1120 warn!("failed to parse rpc request: {}", err);
1121 continue;
1122 }
1123 };
1124
1125 if let Some(resp) = state.handle_request(req).await {
1126 let payload = serde_json::to_vec(&resp)?;
1127 stdout.write_all(&payload).await?;
1128 stdout.write_all(b"\n").await?;
1129 stdout.flush().await?;
1130 }
1131 }
1132
1133 Ok(())
1134 }
1135
1136 #[cfg(test)]
1137 mod tests {
1138 use super::*;
1139
1140 #[test]
1141 fn display_kind_compacts_payload_type_when_redundant() {
1142 assert_eq!(display_kind("message", "message"), "message");
1143 assert_eq!(display_kind("", "unknown"), "event");
1144 }
1145
1146 #[test]
1147 fn compact_text_line_truncates() {
1148 let text = "one two three four five";
1149 let compact = compact_text_line(text, 10);
1150 assert!(compact.ends_with("..."));
1151 }
1152
1153 #[test]
1154 fn format_conversation_search_handles_empty_hits() {
1155 let payload = json!({
1156 "query_id": "q1",
1157 "query": "hello world",
1158 "stats": {
1159 "took_ms": 2,
1160 "result_count": 0
1161 },
1162 "hits": []
1163 });
1164
1165 let text = format_conversation_search_prose(&payload, None).expect("format");
1166 assert!(text.contains("Conversation Search"));
1167 assert!(text.contains("No hits"));
1168 }
1169
1170 #[test]
1171 fn search_args_accept_single_event_kind_and_alias() {
1172 let args: SearchArgs = serde_json::from_value(json!({
1173 "query": "error",
1174 "kind": "reasoning"
1175 }))
1176 .expect("parse search args");
1177
1178 let parsed = args.event_kind.expect("event kind should parse").into_vec();
1179 assert_eq!(parsed, vec![SearchEventKind::Reasoning]);
1180 }
1181
1182 #[test]
1183 fn search_args_accept_event_kind_list() {
1184 let args: SearchArgs = serde_json::from_value(json!({
1185 "query": "error",
1186 "event_kind": ["message", "tool_result"]
1187 }))
1188 .expect("parse search args");
1189
1190 let parsed = args.event_kind.expect("event kind should parse").into_vec();
1191 assert_eq!(
1192 parsed,
1193 vec![SearchEventKind::Message, SearchEventKind::ToolResult]
1194 );
1195 }
1196
1197 #[test]
1198 fn tool_limit_bounds_use_shared_min_and_effective_max() {
1199 assert_eq!(tool_limit_bounds(25), (1, 25));
1200 assert_eq!(tool_limit_bounds(0), (1, 1));
1201 }
1202
1203 #[test]
1204 fn validate_tool_limit_enforces_bounds() {
1205 assert_eq!(
1206 validate_tool_limit("search", None, 25).expect("missing limit accepted"),
1207 None
1208 );
1209 assert_eq!(
1210 validate_tool_limit("search", Some(25), 25).expect("max bound accepted"),
1211 Some(25)
1212 );
1213
1214 let zero_err = validate_tool_limit("search", Some(0), 25).expect_err("zero must fail");
1215 assert_eq!(
1216 zero_err.to_string(),
1217 "search limit must be between 1 and 25 (received 0)"
1218 );
1219
1220 let high_err = validate_tool_limit("search", Some(26), 25).expect_err("above max fails");
1221 assert_eq!(
1222 high_err.to_string(),
1223 "search limit must be between 1 and 25 (received 26)"
1224 );
1225 }
1226
1227 #[test]
1228 fn format_search_prose_reports_capped_limit_metadata() {
1229 let payload = json!({
1230 "query_id": "q1",
1231 "query": "big iron",
1232 "stats": {
1233 "took_ms": 7,
1234 "result_count": 25,
1235 "requested_limit": 100,
1236 "effective_limit": 25,
1237 "limit_capped": true
1238 },
1239 "hits": []
1240 });
1241
1242 let text = format_search_prose(&payload).expect("format");
1243 assert!(text.contains("Limit: effective=25 (capped at max_results=25; requested=100)"));
1244 }
1245
1246 #[test]
1247 fn format_conversation_search_reports_effective_limit_when_uncapped() {
1248 let payload = json!({
1249 "query_id": "q1",
1250 "query": "hello world",
1251 "stats": {
1252 "took_ms": 2,
1253 "result_count": 0,
1254 "requested_limit": 10,
1255 "effective_limit": 10,
1256 "limit_capped": false
1257 },
1258 "hits": []
1259 });
1260
1261 let text = format_conversation_search_prose(&payload, None).expect("format");
1262 assert!(text.contains("Limit: effective=10 (requested=10)"));
1263 }
1264
1265 #[test]
1266 fn format_conversation_search_includes_mode_semantics_when_mode_filter_is_set() {
1267 let payload = json!({
1268 "query_id": "q1",
1269 "query": "hello world",
1270 "stats": {
1271 "took_ms": 2,
1272 "result_count": 0
1273 },
1274 "hits": []
1275 });
1276
1277 let text = format_conversation_search_prose(&payload, Some(ConversationMode::ToolCalling))
1278 .expect("format");
1279 assert!(text.contains("Mode filter: tool_calling"));
1280 assert!(text.contains("Mode semantics: Sessions are classified into exactly one mode"));
1281 assert!(text.contains("Mode meaning: any tool activity"));
1282 }
1283
1284 #[test]
1285 fn search_conversations_mode_doc_describes_precedence_and_mode_meanings() {
1286 assert!(CONVERSATION_MODE_CLASSIFICATION_SEMANTICS
1287 .contains("web_search > mcp_internal > tool_calling > chat"));
1288 assert!(SEARCH_CONVERSATIONS_MODE_DOC.contains("web_search=any web search activity"));
1289 assert!(SEARCH_CONVERSATIONS_MODE_DOC
1290 .contains("mcp_internal=any Codex MCP internal search/open activity"));
1291 assert!(SEARCH_CONVERSATIONS_MODE_DOC.contains("tool_calling=any tool activity"));
1292 assert!(SEARCH_CONVERSATIONS_MODE_DOC.contains("chat=none of the above"));
1293 }
1294
1295 #[test]
1296 fn format_conversation_search_includes_session_metadata() {
1297 let payload = json!({
1298 "query_id": "q1",
1299 "query": "hello world",
1300 "stats": {
1301 "took_ms": 2,
1302 "result_count": 1
1303 },
1304 "hits": [
1305 {
1306 "rank": 1,
1307 "session_id": "sess_c",
1308 "first_event_time": "2026-01-03 10:00:00",
1309 "first_event_unix_ms": 1767434400000_i64,
1310 "last_event_time": "2026-01-03 10:10:00",
1311 "last_event_unix_ms": 1767435000000_i64,
1312 "provider": "codex",
1313 "session_slug": "project-c",
1314 "session_summary": "Session C summary",
1315 "score": 12.5,
1316 "matched_terms": 2,
1317 "event_count_considered": 3,
1318 "best_event_uid": "evt-c-42",
1319 "snippet": "best match from session c"
1320 }
1321 ]
1322 });
1323
1324 let text = format_conversation_search_prose(&payload, None).expect("format");
1325 assert!(text.contains("provider: codex"));
1326 assert!(text.contains("first_last: 2026-01-03 10:00:00 -> 2026-01-03 10:10:00"));
1327 assert!(text.contains("session_slug: project-c"));
1328 assert!(text.contains("session_summary: Session C summary"));
1329 }
1330
1331 #[test]
1332 fn format_search_prose_includes_session_recency() {
1333 let payload = json!({
1334 "query_id": "q2",
1335 "query": "design decision",
1336 "stats": {
1337 "took_ms": 3,
1338 "result_count": 1
1339 },
1340 "hits": [
1341 {
1342 "rank": 1,
1343 "event_uid": "evt-1",
1344 "session_id": "sess-a",
1345 "first_event_time": "2026-01-01 00:00:00",
1346 "last_event_time": "2026-01-02 00:00:00",
1347 "score": 4.2,
1348 "event_class": "message",
1349 "payload_type": "text",
1350 "actor_role": "assistant",
1351 "text_preview": "decision details"
1352 }
1353 ]
1354 });
1355
1356 let text = format_search_prose(&payload).expect("format");
1357 assert!(text.contains("last_event_time=2026-01-02 00:00:00"));
1358 assert!(text.contains("session_window: 2026-01-01 00:00:00 -> 2026-01-02 00:00:00"));
1359 }
1360
1361 #[test]
1362 fn format_session_list_handles_empty_result() {
1363 let payload = json!({
1364 "sessions": [],
1365 "next_cursor": null
1366 });
1367
1368 let text = format_session_list_prose(&payload).expect("format");
1369 assert!(text.contains("Session List"));
1370 assert!(text.contains("No sessions"));
1371 }
1372
1373 #[test]
1374 fn format_session_list_includes_next_cursor_and_times() {
1375 let payload = json!({
1376 "sessions": [
1377 {
1378 "session_id": "sess-1",
1379 "start_time": "2026-01-02 12:00:00",
1380 "start_unix_ms": 1767355200000_i64,
1381 "end_time": "2026-01-02 12:05:00",
1382 "end_unix_ms": 1767355500000_i64,
1383 "event_count": 22_u64,
1384 "mode": "web_search"
1385 }
1386 ],
1387 "next_cursor": "cursor-token"
1388 });
1389
1390 let text = format_session_list_prose(&payload).expect("format");
1391 assert!(text.contains("session=sess-1"));
1392 assert!(text.contains("mode=web_search"));
1393 assert!(text.contains("next_cursor: cursor-token"));
1394 }
1395 }