Skip to content

rust/ingestor/src/ingestor.rs


     1 use crate::clickhouse::ClickHouseClient;
     2 use crate::config::{AppConfig, IngestSource};
     3 use crate::model::{Checkpoint, RowBatch};
     4 use crate::normalize::normalize_record;
     5 use anyhow::{Context, Result};
     6 use glob::glob;
     7 use notify::{Event, RecursiveMode, Watcher};
     8 use serde_json::{json, Value};
     9 use std::collections::{HashMap, HashSet};
    10 use std::io::{BufRead, BufReader, Seek, SeekFrom};
    11 use std::sync::atomic::{AtomicU64, Ordering};
    12 use std::sync::{Arc, Mutex};
    13 use std::time::{Duration, Instant};
    14 use tokio::sync::{mpsc, RwLock, Semaphore};
    15 use tokio::task::JoinHandle;
    16 use tracing::{debug, error, info, warn};
    17  
    18 #[cfg(unix)]
    19 use std::os::unix::fs::MetadataExt;
    20  
    21 #[derive(Debug, Clone)]
    22 struct WorkItem {
    23     source_name: String,
    24     provider: String,
    25     path: String,
    26 }
    27  
    28 impl WorkItem {
    29     fn key(&self) -> String {
    30         format!("{}\n{}", self.source_name, self.path)
    31     }
    32 }
    33  
    34 #[derive(Default)]
    35 struct DispatchState {
    36     pending: HashSet<String>,
    37     inflight: HashSet<String>,
    38     dirty: HashSet<String>,
    39     item_by_key: HashMap<String, WorkItem>,
    40 }
    41  
    42 #[derive(Default)]
    43 struct Metrics {
    44     raw_rows_written: AtomicU64,
    45     event_rows_written: AtomicU64,
    46     err_rows_written: AtomicU64,
    47     last_flush_ms: AtomicU64,
    48     flush_failures: AtomicU64,
    49     queue_depth: AtomicU64,
    50     last_error: Mutex<String>,
    51 }
    52  
    53 #[derive(Debug)]
    54 enum SinkMessage {
    55     Batch(RowBatch),
    56 }
    57  
    58 pub async fn run_ingestor(config: AppConfig) -> Result<()> {
    59     let enabled_sources: Vec<IngestSource> = config
    60         .ingest
    61         .sources
    62         .iter()
    63         .filter(|src| src.enabled)
    64         .cloned()
    65         .collect();
    66  
    67     if enabled_sources.is_empty() {
    68         return Err(anyhow::anyhow!(
    69             "no enabled ingest sources found in config.ingest.sources"
    70         ));
    71     }
    72  
    73     let clickhouse = ClickHouseClient::new(config.clickhouse.clone())?;
    74     clickhouse.ping().await.context("clickhouse ping failed")?;
    75  
    76     let checkpoint_map = clickhouse
    77         .load_checkpoints()
    78         .await
    79         .context("failed to load checkpoints from clickhouse")?;
    80  
    81     info!(
    82         "loaded {} checkpoints across {} sources",
    83         checkpoint_map.len(),
    84         enabled_sources.len()
    85     );
    86  
    87     let checkpoints = Arc::new(RwLock::new(checkpoint_map));
    88     let dispatch = Arc::new(Mutex::new(DispatchState::default()));
    89     let metrics = Arc::new(Metrics::default());
    90  
    91     let process_queue_capacity = config
    92         .ingest
    93         .max_inflight_batches
    94         .saturating_mul(16)
    95         .max(1024);
    96     let (process_tx, mut process_rx) = mpsc::channel::<WorkItem>(process_queue_capacity);
    97     let (sink_tx, sink_rx) =
    98         mpsc::channel::<SinkMessage>(config.ingest.max_inflight_batches.max(16));
    99     let (watch_path_tx, watch_path_rx) = mpsc::unbounded_channel::<WorkItem>();
   100  
   101     let sink_handle = spawn_sink_task(
   102         config.clone(),
   103         clickhouse.clone(),
   104         checkpoints.clone(),
   105         metrics.clone(),
   106         sink_rx,
   107         dispatch.clone(),
   108     );
   109  
   110     let sem = Arc::new(Semaphore::new(config.ingest.max_file_workers.max(1)));
   111     let processor_handle = {
   112         let process_tx_clone = process_tx.clone();
   113         let sink_tx_clone = sink_tx.clone();
   114         let checkpoints_clone = checkpoints.clone();
   115         let dispatch_clone = dispatch.clone();
   116         let sem_clone = sem.clone();
   117         let metrics_clone = metrics.clone();
   118         let cfg_clone = config.clone();
   119  
   120         tokio::spawn(async move {
   121             while let Some(work) = process_rx.recv().await {
   122                 metrics_clone.queue_depth.fetch_sub(1, Ordering::Relaxed);
   123                 let key = work.key();
   124  
   125                 {
   126                     let mut state = dispatch_clone.lock().expect("dispatch mutex poisoned");
   127                     state.pending.remove(&key);
   128                     state.inflight.insert(key.clone());
   129                 }
   130  
   131                 let permit = match sem_clone.clone().acquire_owned().await {
   132                     Ok(permit) => permit,
   133                     Err(_) => break,
   134                 };
   135  
   136                 let sink_tx_worker = sink_tx_clone.clone();
   137                 let process_tx_worker = process_tx_clone.clone();
   138                 let checkpoints_worker = checkpoints_clone.clone();
   139                 let dispatch_worker = dispatch_clone.clone();
   140                 let cfg_worker = cfg_clone.clone();
   141                 let metrics_worker = metrics_clone.clone();
   142  
   143                 tokio::spawn(async move {
   144                     let _permit = permit;
   145                     if let Err(exc) = process_file(
   146                         &cfg_worker,
   147                         &work,
   148                         checkpoints_worker,
   149                         sink_tx_worker,
   150                         &metrics_worker,
   151                     )
   152                     .await
   153                     {
   154                         error!(
   155                             "failed processing {}:{}: {exc}",
   156                             work.source_name, work.path
   157                         );
   158                         *metrics_worker
   159                             .last_error
   160                             .lock()
   161                             .expect("metrics last_error mutex poisoned") = exc.to_string();
   162                     }
   163  
   164                     let mut reschedule: Option<WorkItem> = None;
   165                     {
   166                         let mut state = dispatch_worker.lock().expect("dispatch mutex poisoned");
   167                         state.inflight.remove(&key);
   168                         if state.dirty.remove(&key) {
   169                             if state.pending.insert(key.clone()) {
   170                                 reschedule = state.item_by_key.get(&key).cloned();
   171                             }
   172                         }
   173                     }
   174  
   175                     if let Some(item) = reschedule {
   176                         if process_tx_worker.send(item).await.is_ok() {
   177                             metrics_worker.queue_depth.fetch_add(1, Ordering::Relaxed);
   178                         }
   179                     }
   180                 });
   181             }
   182         })
   183     };
   184  
   185     let debounce_handle = spawn_debounce_task(
   186         config.clone(),
   187         watch_path_rx,
   188         process_tx.clone(),
   189         dispatch.clone(),
   190         metrics.clone(),
   191     );
   192  
   193     let reconcile_handle = spawn_reconcile_task(
   194         config.clone(),
   195         enabled_sources.clone(),
   196         process_tx.clone(),
   197         dispatch.clone(),
   198         metrics.clone(),
   199     );
   200  
   201     let watcher_threads = spawn_watcher_threads(enabled_sources.clone(), watch_path_tx)?;
   202  
   203     if config.ingest.backfill_on_start {
   204         for source in &enabled_sources {
   205             let files = enumerate_jsonl_files(&source.glob)?;
   206             info!(
   207                 "startup backfill queueing {} files for source={}",
   208                 files.len(),
   209                 source.name
   210             );
   211             for path in files {
   212                 enqueue_work(
   213                     WorkItem {
   214                         source_name: source.name.clone(),
   215                         provider: source.provider.clone(),
   216                         path,
   217                     },
   218                     &process_tx,
   219                     &dispatch,
   220                     &metrics,
   221                 )
   222                 .await;
   223             }
   224         }
   225     }
   226  
   227     info!("rust ingestor running; waiting for shutdown signal");
   228     tokio::signal::ctrl_c()
   229         .await
   230         .context("signal handler failed")?;
   231     info!("shutdown signal received");
   232  
   233     drop(process_tx);
   234     drop(sink_tx);
   235  
   236     debounce_handle.abort();
   237     reconcile_handle.abort();
   238     processor_handle.abort();
   239     sink_handle.abort();
   240  
   241     for handle in watcher_threads {
   242         let _ = handle.thread().id();
   243     }
   244  
   245     Ok(())
   246 }
   247  
   248 fn spawn_watcher_threads(
   249     sources: Vec<IngestSource>,
   250     tx: mpsc::UnboundedSender<WorkItem>,
   251 ) -> Result<Vec<std::thread::JoinHandle<()>>> {
   252     let mut handles = Vec::<std::thread::JoinHandle<()>>::new();
   253  
   254     for source in sources {
   255         let source_name = source.name.clone();
   256         let provider = source.provider.clone();
   257         let watch_root = std::path::PathBuf::from(source.watch_root.clone());
   258         let tx_clone = tx.clone();
   259  
   260         info!(
   261             "starting watcher on {} (source={}, provider={})",
   262             watch_root.display(),
   263             source_name,
   264             provider
   265         );
   266  
   267         let handle = std::thread::spawn(move || {
   268             let (event_tx, event_rx) = std::sync::mpsc::channel::<notify::Result<Event>>();
   269  
   270             let mut watcher = match notify::recommended_watcher(move |res| {
   271                 let _ = event_tx.send(res);
   272             }) {
   273                 Ok(watcher) => watcher,
   274                 Err(exc) => {
   275                     eprintln!(
   276                         "[moraine-rust] failed to create watcher for {}: {exc}",
   277                         source_name
   278                     );
   279                     return;
   280                 }
   281             };
   282  
   283             if let Err(exc) = watcher.watch(watch_root.as_path(), RecursiveMode::Recursive) {
   284                 eprintln!(
   285                     "[moraine-rust] failed to watch {} ({}): {exc}",
   286                     watch_root.display(),
   287                     source_name
   288                 );
   289                 return;
   290             }
   291  
   292             loop {
   293                 match event_rx.recv() {
   294                     Ok(Ok(event)) => {
   295                         for path in event.paths {
   296                             let _ = tx_clone.send(WorkItem {
   297                                 source_name: source_name.clone(),
   298                                 provider: provider.clone(),
   299                                 path: path.to_string_lossy().to_string(),
   300                             });
   301                         }
   302                     }
   303                     Ok(Err(exc)) => {
   304                         eprintln!("[moraine-rust] watcher event error ({source_name}): {exc}");
   305                     }
   306                     Err(_) => break,
   307                 }
   308             }
   309         });
   310  
   311         handles.push(handle);
   312     }
   313  
   314     Ok(handles)
   315 }
   316  
   317 fn spawn_debounce_task(
   318     config: AppConfig,
   319     mut rx: mpsc::UnboundedReceiver<WorkItem>,
   320     process_tx: mpsc::Sender<WorkItem>,
   321     dispatch: Arc<Mutex<DispatchState>>,
   322     metrics: Arc<Metrics>,
   323 ) -> JoinHandle<()> {
   324     tokio::spawn(async move {
   325         let debounce = Duration::from_millis(config.ingest.debounce_ms.max(5));
   326         let mut pending = HashMap::<String, (WorkItem, Instant)>::new();
   327         let mut tick = tokio::time::interval(Duration::from_millis(
   328             (config.ingest.debounce_ms / 2).max(10),
   329         ));
   330  
   331         loop {
   332             tokio::select! {
   333                 maybe_work = rx.recv() => {
   334                     match maybe_work {
   335                         Some(work) => {
   336                             pending.insert(work.key(), (work, Instant::now()));
   337                         }
   338                         None => break,
   339                     }
   340                 }
   341                 _ = tick.tick() => {
   342                     if pending.is_empty() {
   343                         continue;
   344                     }
   345  
   346                     let now = Instant::now();
   347                     let ready: Vec<String> = pending
   348                         .iter()
   349                         .filter_map(|(key, (_, seen_at))| {
   350                             if now.duration_since(*seen_at) >= debounce {
   351                                 Some(key.clone())
   352                             } else {
   353                                 None
   354                             }
   355                         })
   356                         .collect();
   357  
   358                     for key in ready {
   359                         if let Some((work, _)) = pending.remove(&key) {
   360                             if !work.path.ends_with(".jsonl") {
   361                                 continue;
   362                             }
   363  
   364                             enqueue_work(work, &process_tx, &dispatch, &metrics).await;
   365                         }
   366                     }
   367                 }
   368             }
   369         }
   370     })
   371 }
   372  
   373 fn spawn_reconcile_task(
   374     config: AppConfig,
   375     sources: Vec<IngestSource>,
   376     process_tx: mpsc::Sender<WorkItem>,
   377     dispatch: Arc<Mutex<DispatchState>>,
   378     metrics: Arc<Metrics>,
   379 ) -> JoinHandle<()> {
   380     tokio::spawn(async move {
   381         let interval = Duration::from_secs_f64(config.ingest.reconcile_interval_seconds.max(5.0));
   382         let mut ticker = tokio::time::interval(interval);
   383  
   384         loop {
   385             ticker.tick().await;
   386             for source in &sources {
   387                 match enumerate_jsonl_files(&source.glob) {
   388                     Ok(paths) => {
   389                         debug!(
   390                             "reconcile scanning {} files for source={}",
   391                             paths.len(),
   392                             source.name
   393                         );
   394                         for path in paths {
   395                             enqueue_work(
   396                                 WorkItem {
   397                                     source_name: source.name.clone(),
   398                                     provider: source.provider.clone(),
   399                                     path,
   400                                 },
   401                                 &process_tx,
   402                                 &dispatch,
   403                                 &metrics,
   404                             )
   405                             .await;
   406                         }
   407                     }
   408                     Err(exc) => {
   409                         warn!("reconcile scan failed for source={}: {exc}", source.name);
   410                     }
   411                 }
   412             }
   413         }
   414     })
   415 }
   416  
   417 fn spawn_sink_task(
   418     config: AppConfig,
   419     clickhouse: ClickHouseClient,
   420     checkpoints: Arc<RwLock<HashMap<String, Checkpoint>>>,
   421     metrics: Arc<Metrics>,
   422     mut rx: mpsc::Receiver<SinkMessage>,
   423     dispatch: Arc<Mutex<DispatchState>>,
   424 ) -> JoinHandle<()> {
   425     tokio::spawn(async move {
   426         let mut raw_rows = Vec::<Value>::new();
   427         let mut event_rows = Vec::<Value>::new();
   428         let mut link_rows = Vec::<Value>::new();
   429         let mut tool_rows = Vec::<Value>::new();
   430         let mut error_rows = Vec::<Value>::new();
   431         let mut checkpoint_updates = HashMap::<String, Checkpoint>::new();
   432  
   433         let flush_interval =
   434             Duration::from_secs_f64(config.ingest.flush_interval_seconds.max(0.05));
   435         let heartbeat_interval =
   436             Duration::from_secs_f64(config.ingest.heartbeat_interval_seconds.max(1.0));
   437  
   438         let mut flush_tick = tokio::time::interval(flush_interval);
   439         let mut heartbeat_tick = tokio::time::interval(heartbeat_interval);
   440  
   441         loop {
   442             tokio::select! {
   443                 maybe_msg = rx.recv() => {
   444                     match maybe_msg {
   445                         Some(SinkMessage::Batch(batch)) => {
   446                             raw_rows.extend(batch.raw_rows);
   447                             event_rows.extend(batch.event_rows);
   448                             link_rows.extend(batch.link_rows);
   449                             tool_rows.extend(batch.tool_rows);
   450                             error_rows.extend(batch.error_rows);
   451                             if let Some(cp) = batch.checkpoint {
   452                                 merge_checkpoint(&mut checkpoint_updates, cp);
   453                             }
   454  
   455                             let total_rows = raw_rows.len() + event_rows.len() + link_rows.len() + tool_rows.len() + error_rows.len();
   456                             if total_rows >= config.ingest.batch_size {
   457                                 flush_pending(
   458                                     &clickhouse,
   459                                     &checkpoints,
   460                                     &metrics,
   461                                     &mut raw_rows,
   462                                     &mut event_rows,
   463                                     &mut link_rows,
   464                                     &mut tool_rows,
   465                                     &mut error_rows,
   466                                     &mut checkpoint_updates,
   467                                 ).await;
   468                             }
   469                         }
   470                         None => break,
   471                     }
   472                 }
   473                 _ = flush_tick.tick() => {
   474                     if !(raw_rows.is_empty() && event_rows.is_empty() && link_rows.is_empty() && tool_rows.is_empty() && error_rows.is_empty() && checkpoint_updates.is_empty()) {
   475                         flush_pending(
   476                             &clickhouse,
   477                             &checkpoints,
   478                             &metrics,
   479                             &mut raw_rows,
   480                             &mut event_rows,
   481                             &mut link_rows,
   482                             &mut tool_rows,
   483                             &mut error_rows,
   484                             &mut checkpoint_updates,
   485                         ).await;
   486                     }
   487                 }
   488                 _ = heartbeat_tick.tick() => {
   489                     let files_active = {
   490                         let state = dispatch.lock().expect("dispatch mutex poisoned");
   491                         state.inflight.len() as u32
   492                     };
   493                     let files_watched = checkpoints.read().await.len() as u32;
   494                     let last_error = {
   495                         metrics
   496                             .last_error
   497                             .lock()
   498                             .expect("metrics last_error mutex poisoned")
   499                             .clone()
   500                     };
   501  
   502                     let heartbeat = json!({
   503                         "host": host_name(),
   504                         "service_version": env!("CARGO_PKG_VERSION"),
   505                         "queue_depth": metrics.queue_depth.load(Ordering::Relaxed),
   506                         "files_active": files_active,
   507                         "files_watched": files_watched,
   508                         "rows_raw_written": metrics.raw_rows_written.load(Ordering::Relaxed),
   509                         "rows_events_written": metrics.event_rows_written.load(Ordering::Relaxed),
   510                         "rows_errors_written": metrics.err_rows_written.load(Ordering::Relaxed),
   511                         "flush_latency_ms": metrics.last_flush_ms.load(Ordering::Relaxed) as u32,
   512                         "append_to_visible_p50_ms": 0u32,
   513                         "append_to_visible_p95_ms": 0u32,
   514                         "last_error": last_error,
   515                     });
   516  
   517                     if let Err(exc) = clickhouse.insert_json_rows("ingest_heartbeats", &[heartbeat]).await {
   518                         warn!("heartbeat insert failed: {exc}");
   519                     }
   520                 }
   521             }
   522         }
   523  
   524         if !(raw_rows.is_empty()
   525             && event_rows.is_empty()
   526             && link_rows.is_empty()
   527             && tool_rows.is_empty()
   528             && error_rows.is_empty()
   529             && checkpoint_updates.is_empty())
   530         {
   531             flush_pending(
   532                 &clickhouse,
   533                 &checkpoints,
   534                 &metrics,
   535                 &mut raw_rows,
   536                 &mut event_rows,
   537                 &mut link_rows,
   538                 &mut tool_rows,
   539                 &mut error_rows,
   540                 &mut checkpoint_updates,
   541             )
   542             .await;
   543         }
   544     })
   545 }
   546  
   547 fn checkpoint_key(source_name: &str, source_file: &str) -> String {
   548     format!("{}\n{}", source_name, source_file)
   549 }
   550  
   551 fn merge_checkpoint(pending: &mut HashMap<String, Checkpoint>, checkpoint: Checkpoint) {
   552     let key = checkpoint_key(&checkpoint.source_name, &checkpoint.source_file);
   553     match pending.get(&key) {
   554         None => {
   555             pending.insert(key, checkpoint);
   556         }
   557         Some(existing) => {
   558             let replace = checkpoint.source_generation > existing.source_generation
   559                 || (checkpoint.source_generation == existing.source_generation
   560                     && checkpoint.last_offset >= existing.last_offset);
   561             if replace {
   562                 pending.insert(key, checkpoint);
   563             }
   564         }
   565     }
   566 }
   567  
   568 async fn flush_pending(
   569     clickhouse: &ClickHouseClient,
   570     checkpoints: &Arc<RwLock<HashMap<String, Checkpoint>>>,
   571     metrics: &Arc<Metrics>,
   572     raw_rows: &mut Vec<Value>,
   573     event_rows: &mut Vec<Value>,
   574     link_rows: &mut Vec<Value>,
   575     tool_rows: &mut Vec<Value>,
   576     error_rows: &mut Vec<Value>,
   577     checkpoint_updates: &mut HashMap<String, Checkpoint>,
   578 ) {
   579     let started = Instant::now();
   580  
   581     let checkpoint_rows: Vec<Value> = checkpoint_updates
   582         .values()
   583         .map(|cp| {
   584             json!({
   585                 "source_name": cp.source_name,
   586                 "source_file": cp.source_file,
   587                 "source_inode": cp.source_inode,
   588                 "source_generation": cp.source_generation,
   589                 "last_offset": cp.last_offset,
   590                 "last_line_no": cp.last_line_no,
   591                 "status": cp.status,
   592             })
   593         })
   594         .collect();
   595  
   596     let flush_result = async {
   597         clickhouse.insert_json_rows("raw_events", raw_rows).await?;
   598         clickhouse.insert_json_rows("events", event_rows).await?;
   599         clickhouse.insert_json_rows("event_links", link_rows).await?;
   600         clickhouse.insert_json_rows("tool_io", tool_rows).await?;
   601         clickhouse.insert_json_rows("ingest_errors", error_rows).await?;
   602         clickhouse
   603             .insert_json_rows("ingest_checkpoints", &checkpoint_rows)
   604             .await?;
   605         Result::<()>::Ok(())
   606     }
   607     .await;
   608  
   609     match flush_result {
   610         Ok(()) => {
   611             metrics
   612                 .raw_rows_written
   613                 .fetch_add(raw_rows.len() as u64, Ordering::Relaxed);
   614             metrics
   615                 .event_rows_written
   616                 .fetch_add(event_rows.len() as u64, Ordering::Relaxed);
   617             metrics
   618                 .err_rows_written
   619                 .fetch_add(error_rows.len() as u64, Ordering::Relaxed);
   620             metrics
   621                 .last_flush_ms
   622                 .store(started.elapsed().as_millis() as u64, Ordering::Relaxed);
   623  
   624             {
   625                 let mut state = checkpoints.write().await;
   626                 for cp in checkpoint_updates.values() {
   627                     let key = checkpoint_key(&cp.source_name, &cp.source_file);
   628                     state.insert(key, cp.clone());
   629                 }
   630             }
   631  
   632             raw_rows.clear();
   633             event_rows.clear();
   634             link_rows.clear();
   635             tool_rows.clear();
   636             error_rows.clear();
   637             checkpoint_updates.clear();
   638         }
   639         Err(exc) => {
   640             metrics.flush_failures.fetch_add(1, Ordering::Relaxed);
   641             *metrics
   642                 .last_error
   643                 .lock()
   644                 .expect("metrics last_error mutex poisoned") = exc.to_string();
   645             warn!("flush failed: {exc}");
   646         }
   647     }
   648 }
   649  
   650 async fn enqueue_work(
   651     work: WorkItem,
   652     process_tx: &mpsc::Sender<WorkItem>,
   653     dispatch: &Arc<Mutex<DispatchState>>,
   654     metrics: &Arc<Metrics>,
   655 ) {
   656     if !work.path.ends_with(".jsonl") {
   657         return;
   658     }
   659  
   660     let key = work.key();
   661     let mut should_send = false;
   662     {
   663         let mut state = dispatch.lock().expect("dispatch mutex poisoned");
   664         state.item_by_key.insert(key.clone(), work.clone());
   665         if state.inflight.contains(&key) {
   666             state.dirty.insert(key.clone());
   667         } else if state.pending.insert(key.clone()) {
   668             should_send = true;
   669         }
   670     }
   671  
   672     if should_send {
   673         if process_tx.send(work).await.is_ok() {
   674             metrics.queue_depth.fetch_add(1, Ordering::Relaxed);
   675         }
   676     }
   677 }
   678  
   679 async fn process_file(
   680     config: &AppConfig,
   681     work: &WorkItem,
   682     checkpoints: Arc<RwLock<HashMap<String, Checkpoint>>>,
   683     sink_tx: mpsc::Sender<SinkMessage>,
   684     metrics: &Arc<Metrics>,
   685 ) -> Result<()> {
   686     let source_file = &work.path;
   687  
   688     let meta = match std::fs::metadata(source_file) {
   689         Ok(meta) => meta,
   690         Err(exc) => {
   691             debug!("metadata missing for {}: {}", source_file, exc);
   692             return Ok(());
   693         }
   694     };
   695  
   696     #[cfg(unix)]
   697     let inode = meta.ino();
   698     #[cfg(not(unix))]
   699     let inode = 0u64;
   700  
   701     let file_size = meta.len();
   702     let cp_key = checkpoint_key(&work.source_name, source_file);
   703     let committed = { checkpoints.read().await.get(&cp_key).cloned() };
   704  
   705     let mut checkpoint = committed.unwrap_or(Checkpoint {
   706         source_name: work.source_name.clone(),
   707         source_file: source_file.to_string(),
   708         source_inode: inode,
   709         source_generation: 1,
   710         last_offset: 0,
   711         last_line_no: 0,
   712         status: "active".to_string(),
   713     });
   714  
   715     let mut generation_changed = false;
   716     if checkpoint.source_inode != inode || file_size < checkpoint.last_offset {
   717         checkpoint.source_inode = inode;
   718         checkpoint.source_generation = checkpoint.source_generation.saturating_add(1).max(1);
   719         checkpoint.last_offset = 0;
   720         checkpoint.last_line_no = 0;
   721         checkpoint.status = "active".to_string();
   722         generation_changed = true;
   723     }
   724  
   725     if file_size == checkpoint.last_offset && !generation_changed {
   726         return Ok(());
   727     }
   728  
   729     let mut file = std::fs::File::open(source_file)
   730         .with_context(|| format!("failed to open {}", source_file))?;
   731     file.seek(SeekFrom::Start(checkpoint.last_offset))
   732         .with_context(|| format!("failed to seek {}", source_file))?;
   733  
   734     let mut reader = BufReader::new(file);
   735     let mut offset = checkpoint.last_offset;
   736     let mut line_no = checkpoint.last_line_no;
   737     let mut session_hint = String::new();
   738     let mut model_hint = String::new();
   739  
   740     let mut batch = RowBatch::default();
   741  
   742     loop {
   743         let start_offset = offset;
   744         let mut buf = Vec::<u8>::new();
   745         let bytes_read = reader
   746             .read_until(b'\n', &mut buf)
   747             .with_context(|| format!("failed reading {}", source_file))?;
   748  
   749         if bytes_read == 0 {
   750             break;
   751         }
   752  
   753         offset = offset.saturating_add(bytes_read as u64);
   754         line_no = line_no.saturating_add(1);
   755  
   756         let mut text = String::from_utf8_lossy(&buf).to_string();
   757         if text.ends_with('\n') {
   758             text.pop();
   759         }
   760  
   761         if text.trim().is_empty() {
   762             continue;
   763         }
   764  
   765         let parsed: Value = match serde_json::from_str::<Value>(&text) {
   766             Ok(value) if value.is_object() => value,
   767             Ok(_) => {
   768                 batch.error_rows.push(json!({
   769                     "source_name": work.source_name,
   770                     "provider": work.provider,
   771                     "source_file": source_file,
   772                     "source_inode": inode,
   773                     "source_generation": checkpoint.source_generation,
   774                     "source_line_no": line_no,
   775                     "source_offset": start_offset,
   776                     "error_kind": "json_parse_error",
   777                     "error_text": "Expected JSON object",
   778                     "raw_fragment": truncate(&text, 20_000),
   779                 }));
   780                 continue;
   781             }
   782             Err(exc) => {
   783                 batch.error_rows.push(json!({
   784                     "source_name": work.source_name,
   785                     "provider": work.provider,
   786                     "source_file": source_file,
   787                     "source_inode": inode,
   788                     "source_generation": checkpoint.source_generation,
   789                     "source_line_no": line_no,
   790                     "source_offset": start_offset,
   791                     "error_kind": "json_parse_error",
   792                     "error_text": exc.to_string(),
   793                     "raw_fragment": truncate(&text, 20_000),
   794                 }));
   795                 continue;
   796             }
   797         };
   798  
   799         let normalized = normalize_record(
   800             &parsed,
   801             &work.source_name,
   802             &work.provider,
   803             source_file,
   804             inode,
   805             checkpoint.source_generation,
   806             line_no,
   807             start_offset,
   808             &session_hint,
   809             &model_hint,
   810         );
   811  
   812         session_hint = normalized.session_hint;
   813         model_hint = normalized.model_hint;
   814         batch.raw_rows.push(normalized.raw_row);
   815         batch.event_rows.extend(normalized.event_rows);
   816         batch.link_rows.extend(normalized.link_rows);
   817         batch.tool_rows.extend(normalized.tool_rows);
   818         batch.lines_processed = batch.lines_processed.saturating_add(1);
   819  
   820         if batch.row_count() >= config.ingest.batch_size {
   821             let mut chunk = RowBatch::default();
   822             chunk.raw_rows = std::mem::take(&mut batch.raw_rows);
   823             chunk.event_rows = std::mem::take(&mut batch.event_rows);
   824             chunk.link_rows = std::mem::take(&mut batch.link_rows);
   825             chunk.tool_rows = std::mem::take(&mut batch.tool_rows);
   826             chunk.error_rows = std::mem::take(&mut batch.error_rows);
   827             chunk.lines_processed = batch.lines_processed;
   828             batch.lines_processed = 0;
   829             chunk.checkpoint = Some(Checkpoint {
   830                 source_name: work.source_name.clone(),
   831                 source_file: source_file.to_string(),
   832                 source_inode: inode,
   833                 source_generation: checkpoint.source_generation,
   834                 last_offset: offset,
   835                 last_line_no: line_no,
   836                 status: "active".to_string(),
   837             });
   838  
   839             sink_tx
   840                 .send(SinkMessage::Batch(chunk))
   841                 .await
   842                 .context("sink channel closed while sending chunk")?;
   843         }
   844     }
   845  
   846     let final_checkpoint = Checkpoint {
   847         source_name: work.source_name.clone(),
   848         source_file: source_file.to_string(),
   849         source_inode: inode,
   850         source_generation: checkpoint.source_generation,
   851         last_offset: offset,
   852         last_line_no: line_no,
   853         status: "active".to_string(),
   854     };
   855  
   856     if batch.row_count() > 0 || generation_changed || offset != checkpoint.last_offset {
   857         batch.checkpoint = Some(final_checkpoint);
   858         sink_tx
   859             .send(SinkMessage::Batch(batch))
   860             .await
   861             .context("sink channel closed while sending final batch")?;
   862     }
   863  
   864     if metrics.queue_depth.load(Ordering::Relaxed) == 0 {
   865         debug!(
   866             "{}:{} caught up at offset {}",
   867             work.source_name, source_file, offset
   868         );
   869     }
   870  
   871     Ok(())
   872 }
   873  
   874 fn truncate(input: &str, max_chars: usize) -> String {
   875     if input.chars().count() <= max_chars {
   876         return input.to_string();
   877     }
   878     input.chars().take(max_chars).collect()
   879 }
   880  
   881 fn enumerate_jsonl_files(glob_pattern: &str) -> Result<Vec<String>> {
   882     let mut files = Vec::<String>::new();
   883     for entry in glob(glob_pattern).with_context(|| format!("invalid glob: {}", glob_pattern))? {
   884         let path = match entry {
   885             Ok(path) => path,
   886             Err(exc) => {
   887                 warn!("glob iteration error: {exc}");
   888                 continue;
   889             }
   890         };
   891  
   892         if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
   893             files.push(path.to_string_lossy().to_string());
   894         }
   895     }
   896     files.sort();
   897     Ok(files)
   898 }
   899  
   900 fn host_name() -> String {
   901     std::env::var("HOSTNAME")
   902         .ok()
   903         .filter(|s| !s.trim().is_empty())
   904         .or_else(|| std::env::var("USER").ok())
   905         .unwrap_or_else(|| "localhost".to_string())
   906 }