Skip to content

crates/moraine-ingest-core/src/sink.rs


     1 use crate::checkpoint::{checkpoint_key, merge_checkpoint};
     2 use crate::heartbeat::host_name;
     3 use crate::model::Checkpoint;
     4 use crate::{
     5     DispatchState, Metrics, SinkMessage, WATCHER_BACKEND_MIXED, WATCHER_BACKEND_NATIVE,
     6     WATCHER_BACKEND_POLL,
     7 };
     8 use chrono::{DateTime, Utc};
     9 use moraine_clickhouse::ClickHouseClient;
    10 use serde_json::{json, Value};
    11 use std::collections::HashMap;
    12 use std::sync::atomic::Ordering;
    13 use std::sync::{Arc, Mutex};
    14 use std::time::{Duration, Instant};
    15 use tokio::sync::{mpsc, RwLock};
    16 use tokio::task::JoinHandle;
    17 use tracing::{info, warn};
    18  
    19 fn watcher_backend_label(value: u64) -> &'static str {
    20     match value {
    21         WATCHER_BACKEND_NATIVE => "native",
    22         WATCHER_BACKEND_POLL => "poll",
    23         WATCHER_BACKEND_MIXED => "mixed",
    24         _ => "unknown",
    25     }
    26 }
    27  
    28 fn saturating_u64_to_u32(value: u64) -> u32 {
    29     value.min(u32::MAX as u64) as u32
    30 }
    31  
    32 fn duration_from_config_seconds(seconds: f64, minimum_seconds: f64, field_name: &str) -> Duration {
    33     if !seconds.is_finite() {
    34         warn!("non-finite config value for `{field_name}` ({seconds}); using {minimum_seconds}");
    35         return Duration::from_secs_f64(minimum_seconds);
    36     }
    37  
    38     let sanitized_seconds = seconds.max(minimum_seconds);
    39     Duration::try_from_secs_f64(sanitized_seconds).unwrap_or_else(|_| {
    40         warn!(
    41             "out-of-range config value for `{field_name}` ({sanitized_seconds}); using {minimum_seconds}"
    42         );
    43         Duration::from_secs_f64(minimum_seconds)
    44     })
    45 }
    46  
    47 fn append_to_visible_percentile(sorted_latencies_ms: &[u64], quantile: f64) -> u64 {
    48     debug_assert!(!sorted_latencies_ms.is_empty());
    49     let rank = ((sorted_latencies_ms.len() as f64) * quantile).ceil() as usize;
    50     sorted_latencies_ms[rank.saturating_sub(1).min(sorted_latencies_ms.len() - 1)]
    51 }
    52  
    53 fn compute_append_to_visible_stats(
    54     raw_rows: &[Value],
    55     visible_at: DateTime<Utc>,
    56 ) -> Option<(u32, u32)> {
    57     let mut latencies_ms: Vec<u64> = raw_rows
    58         .iter()
    59         .filter_map(|row| row.get("record_ts").and_then(Value::as_str))
    60         .filter_map(|record_ts| DateTime::parse_from_rfc3339(record_ts).ok())
    61         .map(|record_ts| {
    62             visible_at
    63                 .signed_duration_since(record_ts.with_timezone(&Utc))
    64                 .num_milliseconds()
    65                 .max(0) as u64
    66         })
    67         .collect();
    68  
    69     if latencies_ms.is_empty() {
    70         return None;
    71     }
    72  
    73     latencies_ms.sort_unstable();
    74     let p50 = append_to_visible_percentile(&latencies_ms, 0.50);
    75     let p95 = append_to_visible_percentile(&latencies_ms, 0.95);
    76     Some((saturating_u64_to_u32(p50), saturating_u64_to_u32(p95)))
    77 }
    78  
    79 pub(crate) fn spawn_sink_task(
    80     config: moraine_config::AppConfig,
    81     clickhouse: ClickHouseClient,
    82     checkpoints: Arc<RwLock<HashMap<String, Checkpoint>>>,
    83     metrics: Arc<Metrics>,
    84     mut rx: mpsc::Receiver<SinkMessage>,
    85     dispatch: Arc<Mutex<DispatchState>>,
    86 ) -> JoinHandle<()> {
    87     tokio::spawn(async move {
    88         let mut raw_rows = Vec::<Value>::new();
    89         let mut event_rows = Vec::<Value>::new();
    90         let mut link_rows = Vec::<Value>::new();
    91         let mut tool_rows = Vec::<Value>::new();
    92         let mut error_rows = Vec::<Value>::new();
    93         let mut checkpoint_updates = HashMap::<String, Checkpoint>::new();
    94  
    95         let flush_interval = duration_from_config_seconds(
    96             config.ingest.flush_interval_seconds,
    97             0.05,
    98             "ingest.flush_interval_seconds",
    99         );
   100         let heartbeat_interval = duration_from_config_seconds(
   101             config.ingest.heartbeat_interval_seconds,
   102             1.0,
   103             "ingest.heartbeat_interval_seconds",
   104         );
   105         let retry_backoff = duration_from_config_seconds(
   106             config.ingest.flush_interval_seconds * 2.0,
   107             0.25,
   108             "ingest.flush_interval_seconds * 2.0",
   109         );
   110  
   111         let mut flush_tick = tokio::time::interval(flush_interval);
   112         let mut heartbeat_tick = tokio::time::interval(heartbeat_interval);
   113         let mut throttling_flush_retries = false;
   114  
   115         loop {
   116             if throttling_flush_retries
   117                 && has_pending_data(
   118                     &raw_rows,
   119                     &event_rows,
   120                     &link_rows,
   121                     &tool_rows,
   122                     &error_rows,
   123                     &checkpoint_updates,
   124                 )
   125             {
   126                 if flush_pending(
   127                     &clickhouse,
   128                     &checkpoints,
   129                     &metrics,
   130                     &mut raw_rows,
   131                     &mut event_rows,
   132                     &mut link_rows,
   133                     &mut tool_rows,
   134                     &mut error_rows,
   135                     &mut checkpoint_updates,
   136                 )
   137                 .await
   138                 {
   139                     throttling_flush_retries = false;
   140                     info!("flush retry succeeded; resuming sink intake");
   141                 } else {
   142                     tokio::select! {
   143                         _ = tokio::time::sleep(retry_backoff) => {}
   144                         _ = heartbeat_tick.tick() => {
   145                             emit_heartbeat(&clickhouse, &metrics, &dispatch).await;
   146                         }
   147                     }
   148                 }
   149                 continue;
   150             }
   151  
   152             tokio::select! {
   153                 maybe_msg = rx.recv() => {
   154                     match maybe_msg {
   155                         Some(SinkMessage::Batch(batch)) => {
   156                             raw_rows.extend(batch.raw_rows);
   157                             event_rows.extend(batch.event_rows);
   158                             link_rows.extend(batch.link_rows);
   159                             tool_rows.extend(batch.tool_rows);
   160                             error_rows.extend(batch.error_rows);
   161                             if let Some(cp) = batch.checkpoint {
   162                                 merge_checkpoint(&mut checkpoint_updates, cp);
   163                             }
   164  
   165                             let total_rows = raw_rows.len() + event_rows.len() + link_rows.len() + tool_rows.len() + error_rows.len();
   166                             if total_rows >= config.ingest.batch_size {
   167                                 if !flush_pending(
   168                                     &clickhouse,
   169                                     &checkpoints,
   170                                     &metrics,
   171                                     &mut raw_rows,
   172                                     &mut event_rows,
   173                                     &mut link_rows,
   174                                     &mut tool_rows,
   175                                     &mut error_rows,
   176                                     &mut checkpoint_updates,
   177                                 ).await {
   178                                     if !throttling_flush_retries {
   179                                         warn!(
   180                                             "flush failed; pausing sink intake and retrying pending rows every {} ms",
   181                                             retry_backoff.as_millis()
   182                                         );
   183                                     }
   184                                     throttling_flush_retries = true;
   185                                 }
   186                             }
   187                         }
   188                         None => break,
   189                     }
   190                 }
   191                 _ = flush_tick.tick() => {
   192                     if has_pending_data(&raw_rows, &event_rows, &link_rows, &tool_rows, &error_rows, &checkpoint_updates) {
   193                         if !flush_pending(
   194                             &clickhouse,
   195                             &checkpoints,
   196                             &metrics,
   197                             &mut raw_rows,
   198                             &mut event_rows,
   199                             &mut link_rows,
   200                             &mut tool_rows,
   201                             &mut error_rows,
   202                             &mut checkpoint_updates,
   203                         ).await {
   204                             if !throttling_flush_retries {
   205                                 warn!(
   206                                     "flush failed; pausing sink intake and retrying pending rows every {} ms",
   207                                     retry_backoff.as_millis()
   208                                 );
   209                             }
   210                             throttling_flush_retries = true;
   211                         }
   212                     }
   213                 }
   214                 _ = heartbeat_tick.tick() => {
   215                     emit_heartbeat(&clickhouse, &metrics, &dispatch).await;
   216                 }
   217             }
   218         }
   219  
   220         if has_pending_data(
   221             &raw_rows,
   222             &event_rows,
   223             &link_rows,
   224             &tool_rows,
   225             &error_rows,
   226             &checkpoint_updates,
   227         ) {
   228             flush_pending(
   229                 &clickhouse,
   230                 &checkpoints,
   231                 &metrics,
   232                 &mut raw_rows,
   233                 &mut event_rows,
   234                 &mut link_rows,
   235                 &mut tool_rows,
   236                 &mut error_rows,
   237                 &mut checkpoint_updates,
   238             )
   239             .await;
   240         }
   241     })
   242 }
   243  
   244 fn has_pending_data(
   245     raw_rows: &[Value],
   246     event_rows: &[Value],
   247     link_rows: &[Value],
   248     tool_rows: &[Value],
   249     error_rows: &[Value],
   250     checkpoint_updates: &HashMap<String, Checkpoint>,
   251 ) -> bool {
   252     !(raw_rows.is_empty()
   253         && event_rows.is_empty()
   254         && link_rows.is_empty()
   255         && tool_rows.is_empty()
   256         && error_rows.is_empty()
   257         && checkpoint_updates.is_empty())
   258 }
   259  
   260 async fn emit_heartbeat(
   261     clickhouse: &ClickHouseClient,
   262     metrics: &Arc<Metrics>,
   263     dispatch: &Arc<Mutex<DispatchState>>,
   264 ) {
   265     let files_active = {
   266         let state = dispatch.lock().expect("dispatch mutex poisoned");
   267         state.inflight.len() as u32
   268     };
   269     let files_watched = metrics.watcher_registrations.load(Ordering::Relaxed) as u32;
   270     let last_error = {
   271         metrics
   272             .last_error
   273             .lock()
   274             .expect("metrics last_error mutex poisoned")
   275             .clone()
   276     };
   277     let watcher_backend =
   278         watcher_backend_label(metrics.watcher_backend_state.load(Ordering::Relaxed));
   279  
   280     let heartbeat = json!({
   281         "host": host_name(),
   282         "service_version": env!("CARGO_PKG_VERSION"),
   283         "queue_depth": metrics.queue_depth.load(Ordering::Relaxed),
   284         "files_active": files_active,
   285         "files_watched": files_watched,
   286         "rows_raw_written": metrics.raw_rows_written.load(Ordering::Relaxed),
   287         "rows_events_written": metrics.event_rows_written.load(Ordering::Relaxed),
   288         "rows_errors_written": metrics.err_rows_written.load(Ordering::Relaxed),
   289         "flush_latency_ms": saturating_u64_to_u32(metrics.last_flush_ms.load(Ordering::Relaxed)),
   290         "append_to_visible_p50_ms": saturating_u64_to_u32(metrics.append_to_visible_p50_ms.load(Ordering::Relaxed)),
   291         "append_to_visible_p95_ms": saturating_u64_to_u32(metrics.append_to_visible_p95_ms.load(Ordering::Relaxed)),
   292         "watcher_backend": watcher_backend,
   293         "watcher_error_count": metrics.watcher_error_count.load(Ordering::Relaxed),
   294         "watcher_reset_count": metrics.watcher_reset_count.load(Ordering::Relaxed),
   295         "watcher_last_reset_unix_ms": metrics.watcher_last_reset_unix_ms.load(Ordering::Relaxed),
   296         "last_error": last_error,
   297     });
   298  
   299     if let Err(exc) = clickhouse
   300         .insert_json_rows("ingest_heartbeats", &[heartbeat])
   301         .await
   302     {
   303         warn!("heartbeat insert failed: {exc}");
   304     }
   305 }
   306  
   307 async fn flush_pending(
   308     clickhouse: &ClickHouseClient,
   309     checkpoints: &Arc<RwLock<HashMap<String, Checkpoint>>>,
   310     metrics: &Arc<Metrics>,
   311     raw_rows: &mut Vec<Value>,
   312     event_rows: &mut Vec<Value>,
   313     link_rows: &mut Vec<Value>,
   314     tool_rows: &mut Vec<Value>,
   315     error_rows: &mut Vec<Value>,
   316     checkpoint_updates: &mut HashMap<String, Checkpoint>,
   317 ) -> bool {
   318     let started = Instant::now();
   319  
   320     let checkpoint_rows: Vec<Value> = checkpoint_updates
   321         .values()
   322         .map(|cp| {
   323             json!({
   324                 "source_name": cp.source_name,
   325                 "source_file": cp.source_file,
   326                 "source_inode": cp.source_inode,
   327                 "source_generation": cp.source_generation,
   328                 "last_offset": cp.last_offset,
   329                 "last_line_no": cp.last_line_no,
   330                 "status": cp.status,
   331             })
   332         })
   333         .collect();
   334  
   335     let flush_result = async {
   336         if !raw_rows.is_empty() {
   337             clickhouse.insert_json_rows("raw_events", raw_rows).await?;
   338             metrics
   339                 .raw_rows_written
   340                 .fetch_add(raw_rows.len() as u64, Ordering::Relaxed);
   341             if let Some((p50_ms, p95_ms)) = compute_append_to_visible_stats(raw_rows, Utc::now()) {
   342                 metrics
   343                     .append_to_visible_p50_ms
   344                     .store(p50_ms as u64, Ordering::Relaxed);
   345                 metrics
   346                     .append_to_visible_p95_ms
   347                     .store(p95_ms as u64, Ordering::Relaxed);
   348             }
   349             raw_rows.clear();
   350         }
   351  
   352         if !event_rows.is_empty() {
   353             clickhouse.insert_json_rows("events", event_rows).await?;
   354             metrics
   355                 .event_rows_written
   356                 .fetch_add(event_rows.len() as u64, Ordering::Relaxed);
   357             event_rows.clear();
   358         }
   359  
   360         if !link_rows.is_empty() {
   361             clickhouse
   362                 .insert_json_rows("event_links", link_rows)
   363                 .await?;
   364             link_rows.clear();
   365         }
   366  
   367         if !tool_rows.is_empty() {
   368             clickhouse.insert_json_rows("tool_io", tool_rows).await?;
   369             tool_rows.clear();
   370         }
   371  
   372         if !error_rows.is_empty() {
   373             clickhouse
   374                 .insert_json_rows("ingest_errors", error_rows)
   375                 .await?;
   376             metrics
   377                 .err_rows_written
   378                 .fetch_add(error_rows.len() as u64, Ordering::Relaxed);
   379             error_rows.clear();
   380         }
   381  
   382         if !checkpoint_rows.is_empty() {
   383             clickhouse
   384                 .insert_json_rows("ingest_checkpoints", &checkpoint_rows)
   385                 .await?;
   386  
   387             {
   388                 let mut state = checkpoints.write().await;
   389                 for cp in checkpoint_updates.values() {
   390                     let key = checkpoint_key(&cp.source_name, &cp.source_file);
   391                     state.insert(key, cp.clone());
   392                 }
   393             }
   394             checkpoint_updates.clear();
   395         }
   396  
   397         metrics
   398             .last_flush_ms
   399             .store(started.elapsed().as_millis() as u64, Ordering::Relaxed);
   400         anyhow::Result::<()>::Ok(())
   401     }
   402     .await;
   403  
   404     match flush_result {
   405         Ok(()) => true,
   406         Err(exc) => {
   407             metrics.flush_failures.fetch_add(1, Ordering::Relaxed);
   408             *metrics
   409                 .last_error
   410                 .lock()
   411                 .expect("metrics last_error mutex poisoned") = exc.to_string();
   412             warn!("flush failed: {exc}");
   413             false
   414         }
   415     }
   416 }
   417  
   418 #[cfg(test)]
   419 mod tests {
   420     use super::*;
   421     use crate::model::RowBatch;
   422     use axum::{
   423         extract::{Query, State},
   424         http::StatusCode,
   425         routing::post,
   426         Router,
   427     };
   428     use chrono::{DateTime, Utc};
   429     use serde_json::json;
   430     use tokio::time::timeout;
   431  
   432     #[derive(Clone, Default)]
   433     struct MockClickHouseState {
   434         calls_by_table: Arc<Mutex<HashMap<String, usize>>>,
   435         fail_once_by_table: Arc<Mutex<HashMap<String, usize>>>,
   436     }
   437  
   438     impl MockClickHouseState {
   439         fn with_single_failure(table: &str) -> Self {
   440             let state = Self::default();
   441             state
   442                 .fail_once_by_table
   443                 .lock()
   444                 .expect("mock fail_once mutex poisoned")
   445                 .insert(table.to_string(), 1);
   446             state
   447         }
   448  
   449         fn call_count(&self, table: &str) -> usize {
   450             *self
   451                 .calls_by_table
   452                 .lock()
   453                 .expect("mock calls mutex poisoned")
   454                 .get(table)
   455                 .unwrap_or(&0)
   456         }
   457     }
   458  
   459     fn inserted_table_name(query: &str) -> Option<&'static str> {
   460         if query.contains("`raw_events`") {
   461             Some("raw_events")
   462         } else if query.contains("`event_links`") {
   463             Some("event_links")
   464         } else if query.contains("`tool_io`") {
   465             Some("tool_io")
   466         } else if query.contains("`ingest_errors`") {
   467             Some("ingest_errors")
   468         } else if query.contains("`ingest_checkpoints`") {
   469             Some("ingest_checkpoints")
   470         } else if query.contains("`events`") {
   471             Some("events")
   472         } else {
   473             None
   474         }
   475     }
   476  
   477     async fn mock_clickhouse_handler(
   478         State(state): State<MockClickHouseState>,
   479         Query(params): Query<HashMap<String, String>>,
   480     ) -> (StatusCode, String) {
   481         let query = params.get("query").cloned().unwrap_or_default();
   482         let Some(table) = inserted_table_name(&query) else {
   483             return (
   484                 StatusCode::BAD_REQUEST,
   485                 format!("unexpected query payload: {query}"),
   486             );
   487         };
   488  
   489         {
   490             let mut calls = state
   491                 .calls_by_table
   492                 .lock()
   493                 .expect("mock calls mutex poisoned");
   494             *calls.entry(table.to_string()).or_insert(0) += 1;
   495         }
   496  
   497         let mut fail_once = state
   498             .fail_once_by_table
   499             .lock()
   500             .expect("mock fail_once mutex poisoned");
   501         if let Some(remaining) = fail_once.get_mut(table) {
   502             if *remaining > 0 {
   503                 *remaining -= 1;
   504                 return (
   505                     StatusCode::INTERNAL_SERVER_ERROR,
   506                     format!("intentional failure for {table}"),
   507                 );
   508             }
   509         }
   510  
   511         (StatusCode::OK, String::new())
   512     }
   513  
   514     async fn spawn_mock_clickhouse(
   515         fail_once_table: &str,
   516     ) -> (ClickHouseClient, MockClickHouseState) {
   517         let state = MockClickHouseState::with_single_failure(fail_once_table);
   518         let app = Router::new()
   519             .route("/", post(mock_clickhouse_handler))
   520             .with_state(state.clone());
   521  
   522         let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
   523             .await
   524             .expect("bind mock clickhouse listener");
   525         let addr = listener.local_addr().expect("mock listener addr");
   526  
   527         tokio::spawn(async move {
   528             let _ = axum::serve(listener, app).await;
   529         });
   530  
   531         let mut config = moraine_config::AppConfig::default();
   532         config.clickhouse.url = format!("http://{}", addr);
   533         config.clickhouse.timeout_seconds = 1.0;
   534         let clickhouse = ClickHouseClient::new(config.clickhouse)
   535             .expect("mock clickhouse client should initialize");
   536  
   537         (clickhouse, state)
   538     }
   539  
   540     fn sample_checkpoint() -> Checkpoint {
   541         Checkpoint {
   542             source_name: "source-a".to_string(),
   543             source_file: "/tmp/source-a.jsonl".to_string(),
   544             source_inode: 42,
   545             source_generation: 1,
   546             last_offset: 100,
   547             last_line_no: 3,
   548             status: "active".to_string(),
   549         }
   550     }
   551  
   552     fn single_row_batch(id: u64) -> SinkMessage {
   553         let mut batch = RowBatch::default();
   554         batch.raw_rows.push(json!({ "id": id }));
   555         SinkMessage::Batch(batch)
   556     }
   557  
   558     #[tokio::test]
   559     async fn failed_flush_throttles_sink_consumption() {
   560         let mut config = moraine_config::AppConfig::default();
   561         config.clickhouse.url = "http://127.0.0.1:1".to_string();
   562         config.clickhouse.timeout_seconds = 1.0;
   563         config.ingest.batch_size = 1;
   564         config.ingest.flush_interval_seconds = 0.05;
   565         config.ingest.heartbeat_interval_seconds = 60.0;
   566  
   567         let clickhouse = ClickHouseClient::new(config.clickhouse.clone())
   568             .expect("clickhouse client should initialize");
   569         let checkpoints = Arc::new(RwLock::new(HashMap::new()));
   570         let metrics = Arc::new(Metrics::default());
   571         let dispatch = Arc::new(Mutex::new(DispatchState::default()));
   572         let (tx, rx) = mpsc::channel(1);
   573  
   574         let handle = spawn_sink_task(config, clickhouse, checkpoints, metrics, rx, dispatch);
   575  
   576         tx.send(single_row_batch(1))
   577             .await
   578             .expect("first send should succeed");
   579         tx.send(single_row_batch(2))
   580             .await
   581             .expect("second send should succeed");
   582  
   583         let third_send = timeout(Duration::from_millis(350), tx.send(single_row_batch(3))).await;
   584         assert!(
   585             third_send.is_err(),
   586             "third send should block while sink retries failed flushes"
   587         );
   588  
   589         handle.abort();
   590     }
   591  
   592     #[tokio::test(flavor = "multi_thread")]
   593     async fn flush_pending_retries_only_unfinished_tables() {
   594         let (clickhouse, mock_state) = spawn_mock_clickhouse("events").await;
   595         let checkpoints = Arc::new(RwLock::new(HashMap::new()));
   596         let metrics = Arc::new(Metrics::default());
   597  
   598         let mut raw_rows = vec![json!({
   599             "record_ts": "2026-02-17T00:00:01.000Z",
   600             "event_uid": "evt-1"
   601         })];
   602         let mut event_rows = vec![json!({"event_uid": "evt-1"})];
   603         let mut link_rows = Vec::<Value>::new();
   604         let mut tool_rows = Vec::<Value>::new();
   605         let mut error_rows = Vec::<Value>::new();
   606         let mut checkpoint_updates = HashMap::new();
   607         let checkpoint = sample_checkpoint();
   608         checkpoint_updates.insert(
   609             checkpoint_key(&checkpoint.source_name, &checkpoint.source_file),
   610             checkpoint.clone(),
   611         );
   612  
   613         let first_attempt = flush_pending(
   614             &clickhouse,
   615             &checkpoints,
   616             &metrics,
   617             &mut raw_rows,
   618             &mut event_rows,
   619             &mut link_rows,
   620             &mut tool_rows,
   621             &mut error_rows,
   622             &mut checkpoint_updates,
   623         )
   624         .await;
   625         assert!(!first_attempt, "first flush should fail at events stage");
   626  
   627         assert!(raw_rows.is_empty(), "raw rows should not be retried");
   628         assert_eq!(
   629             event_rows.len(),
   630             1,
   631             "event rows remain pending after failure"
   632         );
   633         assert_eq!(
   634             checkpoint_updates.len(),
   635             1,
   636             "checkpoint update must remain pending until checkpoint flush succeeds"
   637         );
   638         assert_eq!(metrics.raw_rows_written.load(Ordering::Relaxed), 1);
   639         assert_eq!(metrics.event_rows_written.load(Ordering::Relaxed), 0);
   640         assert_eq!(metrics.flush_failures.load(Ordering::Relaxed), 1);
   641  
   642         let second_attempt = flush_pending(
   643             &clickhouse,
   644             &checkpoints,
   645             &metrics,
   646             &mut raw_rows,
   647             &mut event_rows,
   648             &mut link_rows,
   649             &mut tool_rows,
   650             &mut error_rows,
   651             &mut checkpoint_updates,
   652         )
   653         .await;
   654         assert!(
   655             second_attempt,
   656             "second flush should complete remaining stages"
   657         );
   658  
   659         assert!(event_rows.is_empty());
   660         assert!(checkpoint_updates.is_empty());
   661         assert_eq!(metrics.raw_rows_written.load(Ordering::Relaxed), 1);
   662         assert_eq!(metrics.event_rows_written.load(Ordering::Relaxed), 1);
   663         assert_eq!(metrics.flush_failures.load(Ordering::Relaxed), 1);
   664  
   665         assert_eq!(mock_state.call_count("raw_events"), 1);
   666         assert_eq!(mock_state.call_count("events"), 2);
   667         assert_eq!(mock_state.call_count("ingest_checkpoints"), 1);
   668  
   669         let state = checkpoints.read().await;
   670         let checkpoint_key_value = checkpoint_key(&checkpoint.source_name, &checkpoint.source_file);
   671         assert!(
   672             state.contains_key(&checkpoint_key_value),
   673             "checkpoint cache should advance after checkpoint stage succeeds"
   674         );
   675     }
   676  
   677     #[test]
   678     fn compute_append_to_visible_stats_uses_real_record_timestamps() {
   679         let visible_at = DateTime::parse_from_rfc3339("2026-02-17T00:00:10.000Z")
   680             .expect("valid timestamp")
   681             .with_timezone(&Utc);
   682         let raw_rows = vec![
   683             json!({"record_ts": "2026-02-17T00:00:00.000Z"}),
   684             json!({"record_ts": "2026-02-17T00:00:05.000Z"}),
   685             json!({"record_ts": "2026-02-17T00:00:09.000Z"}),
   686         ];
   687  
   688         let (p50, p95) = compute_append_to_visible_stats(&raw_rows, visible_at)
   689             .expect("expected percentile stats");
   690  
   691         assert_eq!(p50, 5_000);
   692         assert_eq!(p95, 10_000);
   693     }
   694  
   695     #[test]
   696     fn compute_append_to_visible_stats_returns_none_for_unparseable_rows() {
   697         let visible_at = DateTime::parse_from_rfc3339("2026-02-17T00:00:10.000Z")
   698             .expect("valid timestamp")
   699             .with_timezone(&Utc);
   700         let raw_rows = vec![
   701             json!({"record_ts": "not-a-timestamp"}),
   702             json!({"record_ts": ""}),
   703             json!({}),
   704         ];
   705  
   706         let stats = compute_append_to_visible_stats(&raw_rows, visible_at);
   707         assert!(stats.is_none());
   708     }
   709  
   710     #[test]
   711     fn compute_append_to_visible_stats_clamps_future_timestamps_to_zero() {
   712         let visible_at = DateTime::parse_from_rfc3339("2026-02-17T00:00:10.000Z")
   713             .expect("valid timestamp")
   714             .with_timezone(&Utc);
   715         let raw_rows = vec![json!({"record_ts": "2026-02-17T00:00:20.000Z"})];
   716  
   717         let (p50, p95) = compute_append_to_visible_stats(&raw_rows, visible_at)
   718             .expect("expected percentile stats");
   719  
   720         assert_eq!(p50, 0);
   721         assert_eq!(p95, 0);
   722     }
   723  
   724     #[test]
   725     fn duration_from_config_seconds_clamps_to_minimum() {
   726         let duration = duration_from_config_seconds(0.001, 0.05, "ingest.flush_interval_seconds");
   727         assert_eq!(duration, Duration::from_millis(50));
   728     }
   729  
   730     #[test]
   731     fn duration_from_config_seconds_handles_non_finite_values() {
   732         let nan = duration_from_config_seconds(f64::NAN, 0.05, "ingest.flush_interval_seconds");
   733         let pos_inf =
   734             duration_from_config_seconds(f64::INFINITY, 0.05, "ingest.flush_interval_seconds");
   735         let neg_inf =
   736             duration_from_config_seconds(f64::NEG_INFINITY, 0.05, "ingest.flush_interval_seconds");
   737  
   738         assert_eq!(nan, Duration::from_millis(50));
   739         assert_eq!(pos_inf, Duration::from_millis(50));
   740         assert_eq!(neg_inf, Duration::from_millis(50));
   741     }
   742 }