crates/moraine-ingest-core/src/sink.rs
1 use crate::checkpoint::{checkpoint_key, merge_checkpoint};
2 use crate::heartbeat::host_name;
3 use crate::model::Checkpoint;
4 use crate::{
5 DispatchState, Metrics, SinkMessage, WATCHER_BACKEND_MIXED, WATCHER_BACKEND_NATIVE,
6 WATCHER_BACKEND_POLL,
7 };
8 use chrono::{DateTime, Utc};
9 use moraine_clickhouse::ClickHouseClient;
10 use serde_json::{json, Value};
11 use std::collections::HashMap;
12 use std::sync::atomic::Ordering;
13 use std::sync::{Arc, Mutex};
14 use std::time::{Duration, Instant};
15 use tokio::sync::{mpsc, RwLock};
16 use tokio::task::JoinHandle;
17 use tracing::{info, warn};
18
19 fn watcher_backend_label(value: u64) -> &'static str {
20 match value {
21 WATCHER_BACKEND_NATIVE => "native",
22 WATCHER_BACKEND_POLL => "poll",
23 WATCHER_BACKEND_MIXED => "mixed",
24 _ => "unknown",
25 }
26 }
27
28 fn saturating_u64_to_u32(value: u64) -> u32 {
29 value.min(u32::MAX as u64) as u32
30 }
31
32 fn duration_from_config_seconds(seconds: f64, minimum_seconds: f64, field_name: &str) -> Duration {
33 if !seconds.is_finite() {
34 warn!("non-finite config value for `{field_name}` ({seconds}); using {minimum_seconds}");
35 return Duration::from_secs_f64(minimum_seconds);
36 }
37
38 let sanitized_seconds = seconds.max(minimum_seconds);
39 Duration::try_from_secs_f64(sanitized_seconds).unwrap_or_else(|_| {
40 warn!(
41 "out-of-range config value for `{field_name}` ({sanitized_seconds}); using {minimum_seconds}"
42 );
43 Duration::from_secs_f64(minimum_seconds)
44 })
45 }
46
47 fn append_to_visible_percentile(sorted_latencies_ms: &[u64], quantile: f64) -> u64 {
48 debug_assert!(!sorted_latencies_ms.is_empty());
49 let rank = ((sorted_latencies_ms.len() as f64) * quantile).ceil() as usize;
50 sorted_latencies_ms[rank.saturating_sub(1).min(sorted_latencies_ms.len() - 1)]
51 }
52
53 fn compute_append_to_visible_stats(
54 raw_rows: &[Value],
55 visible_at: DateTime<Utc>,
56 ) -> Option<(u32, u32)> {
57 let mut latencies_ms: Vec<u64> = raw_rows
58 .iter()
59 .filter_map(|row| row.get("record_ts").and_then(Value::as_str))
60 .filter_map(|record_ts| DateTime::parse_from_rfc3339(record_ts).ok())
61 .map(|record_ts| {
62 visible_at
63 .signed_duration_since(record_ts.with_timezone(&Utc))
64 .num_milliseconds()
65 .max(0) as u64
66 })
67 .collect();
68
69 if latencies_ms.is_empty() {
70 return None;
71 }
72
73 latencies_ms.sort_unstable();
74 let p50 = append_to_visible_percentile(&latencies_ms, 0.50);
75 let p95 = append_to_visible_percentile(&latencies_ms, 0.95);
76 Some((saturating_u64_to_u32(p50), saturating_u64_to_u32(p95)))
77 }
78
79 pub(crate) fn spawn_sink_task(
80 config: moraine_config::AppConfig,
81 clickhouse: ClickHouseClient,
82 checkpoints: Arc<RwLock<HashMap<String, Checkpoint>>>,
83 metrics: Arc<Metrics>,
84 mut rx: mpsc::Receiver<SinkMessage>,
85 dispatch: Arc<Mutex<DispatchState>>,
86 ) -> JoinHandle<()> {
87 tokio::spawn(async move {
88 let mut raw_rows = Vec::<Value>::new();
89 let mut event_rows = Vec::<Value>::new();
90 let mut link_rows = Vec::<Value>::new();
91 let mut tool_rows = Vec::<Value>::new();
92 let mut error_rows = Vec::<Value>::new();
93 let mut checkpoint_updates = HashMap::<String, Checkpoint>::new();
94
95 let flush_interval = duration_from_config_seconds(
96 config.ingest.flush_interval_seconds,
97 0.05,
98 "ingest.flush_interval_seconds",
99 );
100 let heartbeat_interval = duration_from_config_seconds(
101 config.ingest.heartbeat_interval_seconds,
102 1.0,
103 "ingest.heartbeat_interval_seconds",
104 );
105 let retry_backoff = duration_from_config_seconds(
106 config.ingest.flush_interval_seconds * 2.0,
107 0.25,
108 "ingest.flush_interval_seconds * 2.0",
109 );
110
111 let mut flush_tick = tokio::time::interval(flush_interval);
112 let mut heartbeat_tick = tokio::time::interval(heartbeat_interval);
113 let mut throttling_flush_retries = false;
114
115 loop {
116 if throttling_flush_retries
117 && has_pending_data(
118 &raw_rows,
119 &event_rows,
120 &link_rows,
121 &tool_rows,
122 &error_rows,
123 &checkpoint_updates,
124 )
125 {
126 if flush_pending(
127 &clickhouse,
128 &checkpoints,
129 &metrics,
130 &mut raw_rows,
131 &mut event_rows,
132 &mut link_rows,
133 &mut tool_rows,
134 &mut error_rows,
135 &mut checkpoint_updates,
136 )
137 .await
138 {
139 throttling_flush_retries = false;
140 info!("flush retry succeeded; resuming sink intake");
141 } else {
142 tokio::select! {
143 _ = tokio::time::sleep(retry_backoff) => {}
144 _ = heartbeat_tick.tick() => {
145 emit_heartbeat(&clickhouse, &metrics, &dispatch).await;
146 }
147 }
148 }
149 continue;
150 }
151
152 tokio::select! {
153 maybe_msg = rx.recv() => {
154 match maybe_msg {
155 Some(SinkMessage::Batch(batch)) => {
156 raw_rows.extend(batch.raw_rows);
157 event_rows.extend(batch.event_rows);
158 link_rows.extend(batch.link_rows);
159 tool_rows.extend(batch.tool_rows);
160 error_rows.extend(batch.error_rows);
161 if let Some(cp) = batch.checkpoint {
162 merge_checkpoint(&mut checkpoint_updates, cp);
163 }
164
165 let total_rows = raw_rows.len() + event_rows.len() + link_rows.len() + tool_rows.len() + error_rows.len();
166 if total_rows >= config.ingest.batch_size {
167 if !flush_pending(
168 &clickhouse,
169 &checkpoints,
170 &metrics,
171 &mut raw_rows,
172 &mut event_rows,
173 &mut link_rows,
174 &mut tool_rows,
175 &mut error_rows,
176 &mut checkpoint_updates,
177 ).await {
178 if !throttling_flush_retries {
179 warn!(
180 "flush failed; pausing sink intake and retrying pending rows every {} ms",
181 retry_backoff.as_millis()
182 );
183 }
184 throttling_flush_retries = true;
185 }
186 }
187 }
188 None => break,
189 }
190 }
191 _ = flush_tick.tick() => {
192 if has_pending_data(&raw_rows, &event_rows, &link_rows, &tool_rows, &error_rows, &checkpoint_updates) {
193 if !flush_pending(
194 &clickhouse,
195 &checkpoints,
196 &metrics,
197 &mut raw_rows,
198 &mut event_rows,
199 &mut link_rows,
200 &mut tool_rows,
201 &mut error_rows,
202 &mut checkpoint_updates,
203 ).await {
204 if !throttling_flush_retries {
205 warn!(
206 "flush failed; pausing sink intake and retrying pending rows every {} ms",
207 retry_backoff.as_millis()
208 );
209 }
210 throttling_flush_retries = true;
211 }
212 }
213 }
214 _ = heartbeat_tick.tick() => {
215 emit_heartbeat(&clickhouse, &metrics, &dispatch).await;
216 }
217 }
218 }
219
220 if has_pending_data(
221 &raw_rows,
222 &event_rows,
223 &link_rows,
224 &tool_rows,
225 &error_rows,
226 &checkpoint_updates,
227 ) {
228 flush_pending(
229 &clickhouse,
230 &checkpoints,
231 &metrics,
232 &mut raw_rows,
233 &mut event_rows,
234 &mut link_rows,
235 &mut tool_rows,
236 &mut error_rows,
237 &mut checkpoint_updates,
238 )
239 .await;
240 }
241 })
242 }
243
244 fn has_pending_data(
245 raw_rows: &[Value],
246 event_rows: &[Value],
247 link_rows: &[Value],
248 tool_rows: &[Value],
249 error_rows: &[Value],
250 checkpoint_updates: &HashMap<String, Checkpoint>,
251 ) -> bool {
252 !(raw_rows.is_empty()
253 && event_rows.is_empty()
254 && link_rows.is_empty()
255 && tool_rows.is_empty()
256 && error_rows.is_empty()
257 && checkpoint_updates.is_empty())
258 }
259
260 async fn emit_heartbeat(
261 clickhouse: &ClickHouseClient,
262 metrics: &Arc<Metrics>,
263 dispatch: &Arc<Mutex<DispatchState>>,
264 ) {
265 let files_active = {
266 let state = dispatch.lock().expect("dispatch mutex poisoned");
267 state.inflight.len() as u32
268 };
269 let files_watched = metrics.watcher_registrations.load(Ordering::Relaxed) as u32;
270 let last_error = {
271 metrics
272 .last_error
273 .lock()
274 .expect("metrics last_error mutex poisoned")
275 .clone()
276 };
277 let watcher_backend =
278 watcher_backend_label(metrics.watcher_backend_state.load(Ordering::Relaxed));
279
280 let heartbeat = json!({
281 "host": host_name(),
282 "service_version": env!("CARGO_PKG_VERSION"),
283 "queue_depth": metrics.queue_depth.load(Ordering::Relaxed),
284 "files_active": files_active,
285 "files_watched": files_watched,
286 "rows_raw_written": metrics.raw_rows_written.load(Ordering::Relaxed),
287 "rows_events_written": metrics.event_rows_written.load(Ordering::Relaxed),
288 "rows_errors_written": metrics.err_rows_written.load(Ordering::Relaxed),
289 "flush_latency_ms": saturating_u64_to_u32(metrics.last_flush_ms.load(Ordering::Relaxed)),
290 "append_to_visible_p50_ms": saturating_u64_to_u32(metrics.append_to_visible_p50_ms.load(Ordering::Relaxed)),
291 "append_to_visible_p95_ms": saturating_u64_to_u32(metrics.append_to_visible_p95_ms.load(Ordering::Relaxed)),
292 "watcher_backend": watcher_backend,
293 "watcher_error_count": metrics.watcher_error_count.load(Ordering::Relaxed),
294 "watcher_reset_count": metrics.watcher_reset_count.load(Ordering::Relaxed),
295 "watcher_last_reset_unix_ms": metrics.watcher_last_reset_unix_ms.load(Ordering::Relaxed),
296 "last_error": last_error,
297 });
298
299 if let Err(exc) = clickhouse
300 .insert_json_rows("ingest_heartbeats", &[heartbeat])
301 .await
302 {
303 warn!("heartbeat insert failed: {exc}");
304 }
305 }
306
307 async fn flush_pending(
308 clickhouse: &ClickHouseClient,
309 checkpoints: &Arc<RwLock<HashMap<String, Checkpoint>>>,
310 metrics: &Arc<Metrics>,
311 raw_rows: &mut Vec<Value>,
312 event_rows: &mut Vec<Value>,
313 link_rows: &mut Vec<Value>,
314 tool_rows: &mut Vec<Value>,
315 error_rows: &mut Vec<Value>,
316 checkpoint_updates: &mut HashMap<String, Checkpoint>,
317 ) -> bool {
318 let started = Instant::now();
319
320 let checkpoint_rows: Vec<Value> = checkpoint_updates
321 .values()
322 .map(|cp| {
323 json!({
324 "source_name": cp.source_name,
325 "source_file": cp.source_file,
326 "source_inode": cp.source_inode,
327 "source_generation": cp.source_generation,
328 "last_offset": cp.last_offset,
329 "last_line_no": cp.last_line_no,
330 "status": cp.status,
331 })
332 })
333 .collect();
334
335 let flush_result = async {
336 if !raw_rows.is_empty() {
337 clickhouse.insert_json_rows("raw_events", raw_rows).await?;
338 metrics
339 .raw_rows_written
340 .fetch_add(raw_rows.len() as u64, Ordering::Relaxed);
341 if let Some((p50_ms, p95_ms)) = compute_append_to_visible_stats(raw_rows, Utc::now()) {
342 metrics
343 .append_to_visible_p50_ms
344 .store(p50_ms as u64, Ordering::Relaxed);
345 metrics
346 .append_to_visible_p95_ms
347 .store(p95_ms as u64, Ordering::Relaxed);
348 }
349 raw_rows.clear();
350 }
351
352 if !event_rows.is_empty() {
353 clickhouse.insert_json_rows("events", event_rows).await?;
354 metrics
355 .event_rows_written
356 .fetch_add(event_rows.len() as u64, Ordering::Relaxed);
357 event_rows.clear();
358 }
359
360 if !link_rows.is_empty() {
361 clickhouse
362 .insert_json_rows("event_links", link_rows)
363 .await?;
364 link_rows.clear();
365 }
366
367 if !tool_rows.is_empty() {
368 clickhouse.insert_json_rows("tool_io", tool_rows).await?;
369 tool_rows.clear();
370 }
371
372 if !error_rows.is_empty() {
373 clickhouse
374 .insert_json_rows("ingest_errors", error_rows)
375 .await?;
376 metrics
377 .err_rows_written
378 .fetch_add(error_rows.len() as u64, Ordering::Relaxed);
379 error_rows.clear();
380 }
381
382 if !checkpoint_rows.is_empty() {
383 clickhouse
384 .insert_json_rows("ingest_checkpoints", &checkpoint_rows)
385 .await?;
386
387 {
388 let mut state = checkpoints.write().await;
389 for cp in checkpoint_updates.values() {
390 let key = checkpoint_key(&cp.source_name, &cp.source_file);
391 state.insert(key, cp.clone());
392 }
393 }
394 checkpoint_updates.clear();
395 }
396
397 metrics
398 .last_flush_ms
399 .store(started.elapsed().as_millis() as u64, Ordering::Relaxed);
400 anyhow::Result::<()>::Ok(())
401 }
402 .await;
403
404 match flush_result {
405 Ok(()) => true,
406 Err(exc) => {
407 metrics.flush_failures.fetch_add(1, Ordering::Relaxed);
408 *metrics
409 .last_error
410 .lock()
411 .expect("metrics last_error mutex poisoned") = exc.to_string();
412 warn!("flush failed: {exc}");
413 false
414 }
415 }
416 }
417
418 #[cfg(test)]
419 mod tests {
420 use super::*;
421 use crate::model::RowBatch;
422 use axum::{
423 extract::{Query, State},
424 http::StatusCode,
425 routing::post,
426 Router,
427 };
428 use chrono::{DateTime, Utc};
429 use serde_json::json;
430 use tokio::time::timeout;
431
432 #[derive(Clone, Default)]
433 struct MockClickHouseState {
434 calls_by_table: Arc<Mutex<HashMap<String, usize>>>,
435 fail_once_by_table: Arc<Mutex<HashMap<String, usize>>>,
436 }
437
438 impl MockClickHouseState {
439 fn with_single_failure(table: &str) -> Self {
440 let state = Self::default();
441 state
442 .fail_once_by_table
443 .lock()
444 .expect("mock fail_once mutex poisoned")
445 .insert(table.to_string(), 1);
446 state
447 }
448
449 fn call_count(&self, table: &str) -> usize {
450 *self
451 .calls_by_table
452 .lock()
453 .expect("mock calls mutex poisoned")
454 .get(table)
455 .unwrap_or(&0)
456 }
457 }
458
459 fn inserted_table_name(query: &str) -> Option<&'static str> {
460 if query.contains("`raw_events`") {
461 Some("raw_events")
462 } else if query.contains("`event_links`") {
463 Some("event_links")
464 } else if query.contains("`tool_io`") {
465 Some("tool_io")
466 } else if query.contains("`ingest_errors`") {
467 Some("ingest_errors")
468 } else if query.contains("`ingest_checkpoints`") {
469 Some("ingest_checkpoints")
470 } else if query.contains("`events`") {
471 Some("events")
472 } else {
473 None
474 }
475 }
476
477 async fn mock_clickhouse_handler(
478 State(state): State<MockClickHouseState>,
479 Query(params): Query<HashMap<String, String>>,
480 ) -> (StatusCode, String) {
481 let query = params.get("query").cloned().unwrap_or_default();
482 let Some(table) = inserted_table_name(&query) else {
483 return (
484 StatusCode::BAD_REQUEST,
485 format!("unexpected query payload: {query}"),
486 );
487 };
488
489 {
490 let mut calls = state
491 .calls_by_table
492 .lock()
493 .expect("mock calls mutex poisoned");
494 *calls.entry(table.to_string()).or_insert(0) += 1;
495 }
496
497 let mut fail_once = state
498 .fail_once_by_table
499 .lock()
500 .expect("mock fail_once mutex poisoned");
501 if let Some(remaining) = fail_once.get_mut(table) {
502 if *remaining > 0 {
503 *remaining -= 1;
504 return (
505 StatusCode::INTERNAL_SERVER_ERROR,
506 format!("intentional failure for {table}"),
507 );
508 }
509 }
510
511 (StatusCode::OK, String::new())
512 }
513
514 async fn spawn_mock_clickhouse(
515 fail_once_table: &str,
516 ) -> (ClickHouseClient, MockClickHouseState) {
517 let state = MockClickHouseState::with_single_failure(fail_once_table);
518 let app = Router::new()
519 .route("/", post(mock_clickhouse_handler))
520 .with_state(state.clone());
521
522 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
523 .await
524 .expect("bind mock clickhouse listener");
525 let addr = listener.local_addr().expect("mock listener addr");
526
527 tokio::spawn(async move {
528 let _ = axum::serve(listener, app).await;
529 });
530
531 let mut config = moraine_config::AppConfig::default();
532 config.clickhouse.url = format!("http://{}", addr);
533 config.clickhouse.timeout_seconds = 1.0;
534 let clickhouse = ClickHouseClient::new(config.clickhouse)
535 .expect("mock clickhouse client should initialize");
536
537 (clickhouse, state)
538 }
539
540 fn sample_checkpoint() -> Checkpoint {
541 Checkpoint {
542 source_name: "source-a".to_string(),
543 source_file: "/tmp/source-a.jsonl".to_string(),
544 source_inode: 42,
545 source_generation: 1,
546 last_offset: 100,
547 last_line_no: 3,
548 status: "active".to_string(),
549 }
550 }
551
552 fn single_row_batch(id: u64) -> SinkMessage {
553 let mut batch = RowBatch::default();
554 batch.raw_rows.push(json!({ "id": id }));
555 SinkMessage::Batch(batch)
556 }
557
558 #[tokio::test]
559 async fn failed_flush_throttles_sink_consumption() {
560 let mut config = moraine_config::AppConfig::default();
561 config.clickhouse.url = "http://127.0.0.1:1".to_string();
562 config.clickhouse.timeout_seconds = 1.0;
563 config.ingest.batch_size = 1;
564 config.ingest.flush_interval_seconds = 0.05;
565 config.ingest.heartbeat_interval_seconds = 60.0;
566
567 let clickhouse = ClickHouseClient::new(config.clickhouse.clone())
568 .expect("clickhouse client should initialize");
569 let checkpoints = Arc::new(RwLock::new(HashMap::new()));
570 let metrics = Arc::new(Metrics::default());
571 let dispatch = Arc::new(Mutex::new(DispatchState::default()));
572 let (tx, rx) = mpsc::channel(1);
573
574 let handle = spawn_sink_task(config, clickhouse, checkpoints, metrics, rx, dispatch);
575
576 tx.send(single_row_batch(1))
577 .await
578 .expect("first send should succeed");
579 tx.send(single_row_batch(2))
580 .await
581 .expect("second send should succeed");
582
583 let third_send = timeout(Duration::from_millis(350), tx.send(single_row_batch(3))).await;
584 assert!(
585 third_send.is_err(),
586 "third send should block while sink retries failed flushes"
587 );
588
589 handle.abort();
590 }
591
592 #[tokio::test(flavor = "multi_thread")]
593 async fn flush_pending_retries_only_unfinished_tables() {
594 let (clickhouse, mock_state) = spawn_mock_clickhouse("events").await;
595 let checkpoints = Arc::new(RwLock::new(HashMap::new()));
596 let metrics = Arc::new(Metrics::default());
597
598 let mut raw_rows = vec![json!({
599 "record_ts": "2026-02-17T00:00:01.000Z",
600 "event_uid": "evt-1"
601 })];
602 let mut event_rows = vec![json!({"event_uid": "evt-1"})];
603 let mut link_rows = Vec::<Value>::new();
604 let mut tool_rows = Vec::<Value>::new();
605 let mut error_rows = Vec::<Value>::new();
606 let mut checkpoint_updates = HashMap::new();
607 let checkpoint = sample_checkpoint();
608 checkpoint_updates.insert(
609 checkpoint_key(&checkpoint.source_name, &checkpoint.source_file),
610 checkpoint.clone(),
611 );
612
613 let first_attempt = flush_pending(
614 &clickhouse,
615 &checkpoints,
616 &metrics,
617 &mut raw_rows,
618 &mut event_rows,
619 &mut link_rows,
620 &mut tool_rows,
621 &mut error_rows,
622 &mut checkpoint_updates,
623 )
624 .await;
625 assert!(!first_attempt, "first flush should fail at events stage");
626
627 assert!(raw_rows.is_empty(), "raw rows should not be retried");
628 assert_eq!(
629 event_rows.len(),
630 1,
631 "event rows remain pending after failure"
632 );
633 assert_eq!(
634 checkpoint_updates.len(),
635 1,
636 "checkpoint update must remain pending until checkpoint flush succeeds"
637 );
638 assert_eq!(metrics.raw_rows_written.load(Ordering::Relaxed), 1);
639 assert_eq!(metrics.event_rows_written.load(Ordering::Relaxed), 0);
640 assert_eq!(metrics.flush_failures.load(Ordering::Relaxed), 1);
641
642 let second_attempt = flush_pending(
643 &clickhouse,
644 &checkpoints,
645 &metrics,
646 &mut raw_rows,
647 &mut event_rows,
648 &mut link_rows,
649 &mut tool_rows,
650 &mut error_rows,
651 &mut checkpoint_updates,
652 )
653 .await;
654 assert!(
655 second_attempt,
656 "second flush should complete remaining stages"
657 );
658
659 assert!(event_rows.is_empty());
660 assert!(checkpoint_updates.is_empty());
661 assert_eq!(metrics.raw_rows_written.load(Ordering::Relaxed), 1);
662 assert_eq!(metrics.event_rows_written.load(Ordering::Relaxed), 1);
663 assert_eq!(metrics.flush_failures.load(Ordering::Relaxed), 1);
664
665 assert_eq!(mock_state.call_count("raw_events"), 1);
666 assert_eq!(mock_state.call_count("events"), 2);
667 assert_eq!(mock_state.call_count("ingest_checkpoints"), 1);
668
669 let state = checkpoints.read().await;
670 let checkpoint_key_value = checkpoint_key(&checkpoint.source_name, &checkpoint.source_file);
671 assert!(
672 state.contains_key(&checkpoint_key_value),
673 "checkpoint cache should advance after checkpoint stage succeeds"
674 );
675 }
676
677 #[test]
678 fn compute_append_to_visible_stats_uses_real_record_timestamps() {
679 let visible_at = DateTime::parse_from_rfc3339("2026-02-17T00:00:10.000Z")
680 .expect("valid timestamp")
681 .with_timezone(&Utc);
682 let raw_rows = vec![
683 json!({"record_ts": "2026-02-17T00:00:00.000Z"}),
684 json!({"record_ts": "2026-02-17T00:00:05.000Z"}),
685 json!({"record_ts": "2026-02-17T00:00:09.000Z"}),
686 ];
687
688 let (p50, p95) = compute_append_to_visible_stats(&raw_rows, visible_at)
689 .expect("expected percentile stats");
690
691 assert_eq!(p50, 5_000);
692 assert_eq!(p95, 10_000);
693 }
694
695 #[test]
696 fn compute_append_to_visible_stats_returns_none_for_unparseable_rows() {
697 let visible_at = DateTime::parse_from_rfc3339("2026-02-17T00:00:10.000Z")
698 .expect("valid timestamp")
699 .with_timezone(&Utc);
700 let raw_rows = vec![
701 json!({"record_ts": "not-a-timestamp"}),
702 json!({"record_ts": ""}),
703 json!({}),
704 ];
705
706 let stats = compute_append_to_visible_stats(&raw_rows, visible_at);
707 assert!(stats.is_none());
708 }
709
710 #[test]
711 fn compute_append_to_visible_stats_clamps_future_timestamps_to_zero() {
712 let visible_at = DateTime::parse_from_rfc3339("2026-02-17T00:00:10.000Z")
713 .expect("valid timestamp")
714 .with_timezone(&Utc);
715 let raw_rows = vec![json!({"record_ts": "2026-02-17T00:00:20.000Z"})];
716
717 let (p50, p95) = compute_append_to_visible_stats(&raw_rows, visible_at)
718 .expect("expected percentile stats");
719
720 assert_eq!(p50, 0);
721 assert_eq!(p95, 0);
722 }
723
724 #[test]
725 fn duration_from_config_seconds_clamps_to_minimum() {
726 let duration = duration_from_config_seconds(0.001, 0.05, "ingest.flush_interval_seconds");
727 assert_eq!(duration, Duration::from_millis(50));
728 }
729
730 #[test]
731 fn duration_from_config_seconds_handles_non_finite_values() {
732 let nan = duration_from_config_seconds(f64::NAN, 0.05, "ingest.flush_interval_seconds");
733 let pos_inf =
734 duration_from_config_seconds(f64::INFINITY, 0.05, "ingest.flush_interval_seconds");
735 let neg_inf =
736 duration_from_config_seconds(f64::NEG_INFINITY, 0.05, "ingest.flush_interval_seconds");
737
738 assert_eq!(nan, Duration::from_millis(50));
739 assert_eq!(pos_inf, Duration::from_millis(50));
740 assert_eq!(neg_inf, Duration::from_millis(50));
741 }
742 }