Skip to content

crates/moraine-ingest-core/src/dispatch.rs


     1 use crate::checkpoint::checkpoint_key;
     2 use crate::model::{Checkpoint, RowBatch};
     3 use crate::normalize::normalize_record;
     4 use crate::{DispatchState, Metrics, SinkMessage, WorkItem};
     5 use anyhow::{Context, Result};
     6 use moraine_config::AppConfig;
     7 use serde_json::{json, Value};
     8 use std::collections::HashMap;
     9 #[cfg(not(unix))]
    10 use std::hash::{Hash, Hasher};
    11 use std::io::{BufRead, BufReader, Seek, SeekFrom};
    12 use std::sync::atomic::Ordering;
    13 use std::sync::{Arc, Mutex};
    14 use std::time::{Duration, Instant};
    15 use tokio::sync::{mpsc, RwLock};
    16 use tokio::task::JoinHandle;
    17 use tracing::debug;
    18  
    19 #[cfg(not(unix))]
    20 use same_file::Handle;
    21 #[cfg(unix)]
    22 use std::os::unix::fs::MetadataExt;
    23 #[cfg(not(unix))]
    24 use std::time::UNIX_EPOCH;
    25  
    26 pub(crate) fn spawn_debounce_task(
    27     config: AppConfig,
    28     mut rx: mpsc::UnboundedReceiver<WorkItem>,
    29     process_tx: mpsc::Sender<WorkItem>,
    30     dispatch: Arc<Mutex<DispatchState>>,
    31     metrics: Arc<Metrics>,
    32 ) -> JoinHandle<()> {
    33     tokio::spawn(async move {
    34         let debounce = Duration::from_millis(config.ingest.debounce_ms.max(5));
    35         let mut pending = HashMap::<String, (WorkItem, Instant)>::new();
    36         let mut tick = tokio::time::interval(Duration::from_millis(
    37             (config.ingest.debounce_ms / 2).max(10),
    38         ));
    39  
    40         loop {
    41             tokio::select! {
    42                 maybe_work = rx.recv() => {
    43                     match maybe_work {
    44                         Some(work) => {
    45                             pending.insert(work.key(), (work, Instant::now()));
    46                         }
    47                         None => break,
    48                     }
    49                 }
    50                 _ = tick.tick() => {
    51                     if pending.is_empty() {
    52                         continue;
    53                     }
    54  
    55                     let now = Instant::now();
    56                     let ready: Vec<String> = pending
    57                         .iter()
    58                         .filter_map(|(key, (_, seen_at))| {
    59                             if now.duration_since(*seen_at) >= debounce {
    60                                 Some(key.clone())
    61                             } else {
    62                                 None
    63                             }
    64                         })
    65                         .collect();
    66  
    67                     for key in ready {
    68                         if let Some((work, _)) = pending.remove(&key) {
    69                             if !work.path.ends_with(".jsonl") {
    70                                 continue;
    71                             }
    72  
    73                             enqueue_work(work, &process_tx, &dispatch, &metrics).await;
    74                         }
    75                     }
    76                 }
    77             }
    78         }
    79     })
    80 }
    81  
    82 pub(crate) async fn enqueue_work(
    83     work: WorkItem,
    84     process_tx: &mpsc::Sender<WorkItem>,
    85     dispatch: &Arc<Mutex<DispatchState>>,
    86     metrics: &Arc<Metrics>,
    87 ) {
    88     if !work.path.ends_with(".jsonl") {
    89         return;
    90     }
    91  
    92     let key = work.key();
    93     let mut should_send = false;
    94     {
    95         let mut state = dispatch.lock().expect("dispatch mutex poisoned");
    96         state.item_by_key.insert(key.clone(), work.clone());
    97         if state.inflight.contains(&key) {
    98             state.dirty.insert(key.clone());
    99         } else if state.pending.insert(key.clone()) {
   100             should_send = true;
   101         }
   102     }
   103  
   104     if should_send {
   105         if process_tx.send(work).await.is_ok() {
   106             metrics.queue_depth.fetch_add(1, Ordering::Relaxed);
   107         }
   108     }
   109 }
   110  
   111 pub(crate) fn complete_work(key: &str, dispatch: &Arc<Mutex<DispatchState>>) -> Option<WorkItem> {
   112     let mut state = dispatch.lock().expect("dispatch mutex poisoned");
   113     state.inflight.remove(key);
   114  
   115     if state.dirty.remove(key) {
   116         if state.pending.insert(key.to_string()) {
   117             return state.item_by_key.get(key).cloned();
   118         }
   119         return None;
   120     }
   121  
   122     if !state.pending.contains(key) && !state.inflight.contains(key) && !state.dirty.contains(key) {
   123         state.item_by_key.remove(key);
   124     }
   125  
   126     None
   127 }
   128  
   129 pub(crate) async fn process_file(
   130     config: &AppConfig,
   131     work: &WorkItem,
   132     checkpoints: Arc<RwLock<HashMap<String, Checkpoint>>>,
   133     sink_tx: mpsc::Sender<SinkMessage>,
   134     metrics: &Arc<Metrics>,
   135 ) -> Result<()> {
   136     let source_file = &work.path;
   137  
   138     let meta = match std::fs::metadata(source_file) {
   139         Ok(meta) => meta,
   140         Err(exc) => {
   141             debug!("metadata missing for {}: {}", source_file, exc);
   142             return Ok(());
   143         }
   144     };
   145  
   146     let inode = source_inode_for_file(source_file, &meta);
   147  
   148     let file_size = meta.len();
   149     let cp_key = checkpoint_key(&work.source_name, source_file);
   150     let committed = { checkpoints.read().await.get(&cp_key).cloned() };
   151  
   152     let mut checkpoint = committed.unwrap_or(Checkpoint {
   153         source_name: work.source_name.clone(),
   154         source_file: source_file.to_string(),
   155         source_inode: inode,
   156         source_generation: 1,
   157         last_offset: 0,
   158         last_line_no: 0,
   159         status: "active".to_string(),
   160     });
   161  
   162     let mut generation_changed = false;
   163     if checkpoint.source_inode != inode || file_size < checkpoint.last_offset {
   164         checkpoint.source_inode = inode;
   165         checkpoint.source_generation = checkpoint.source_generation.saturating_add(1).max(1);
   166         checkpoint.last_offset = 0;
   167         checkpoint.last_line_no = 0;
   168         checkpoint.status = "active".to_string();
   169         generation_changed = true;
   170     }
   171  
   172     if file_size == checkpoint.last_offset && !generation_changed {
   173         return Ok(());
   174     }
   175  
   176     let mut file = std::fs::File::open(source_file)
   177         .with_context(|| format!("failed to open {}", source_file))?;
   178     file.seek(SeekFrom::Start(checkpoint.last_offset))
   179         .with_context(|| format!("failed to seek {}", source_file))?;
   180  
   181     let mut reader = BufReader::new(file);
   182     let mut offset = checkpoint.last_offset;
   183     let mut line_no = checkpoint.last_line_no;
   184     let mut session_hint = String::new();
   185     let mut model_hint = String::new();
   186  
   187     let mut batch = RowBatch::default();
   188  
   189     loop {
   190         let start_offset = offset;
   191         let mut buf = Vec::<u8>::new();
   192         let bytes_read = reader
   193             .read_until(b'\n', &mut buf)
   194             .with_context(|| format!("failed reading {}", source_file))?;
   195  
   196         if bytes_read == 0 {
   197             break;
   198         }
   199  
   200         offset = offset.saturating_add(bytes_read as u64);
   201         line_no = line_no.saturating_add(1);
   202  
   203         let mut text = String::from_utf8_lossy(&buf).to_string();
   204         if text.ends_with('\n') {
   205             text.pop();
   206         }
   207  
   208         if text.trim().is_empty() {
   209             continue;
   210         }
   211  
   212         let parsed: Value = match serde_json::from_str::<Value>(&text) {
   213             Ok(value) if value.is_object() => value,
   214             Ok(_) => {
   215                 batch.error_rows.push(json!({
   216                     "source_name": work.source_name,
   217                     "provider": work.provider,
   218                     "source_file": source_file,
   219                     "source_inode": inode,
   220                     "source_generation": checkpoint.source_generation,
   221                     "source_line_no": line_no,
   222                     "source_offset": start_offset,
   223                     "error_kind": "json_parse_error",
   224                     "error_text": "Expected JSON object",
   225                     "raw_fragment": truncate(&text, 20_000),
   226                 }));
   227                 continue;
   228             }
   229             Err(exc) => {
   230                 batch.error_rows.push(json!({
   231                     "source_name": work.source_name,
   232                     "provider": work.provider,
   233                     "source_file": source_file,
   234                     "source_inode": inode,
   235                     "source_generation": checkpoint.source_generation,
   236                     "source_line_no": line_no,
   237                     "source_offset": start_offset,
   238                     "error_kind": "json_parse_error",
   239                     "error_text": exc.to_string(),
   240                     "raw_fragment": truncate(&text, 20_000),
   241                 }));
   242                 continue;
   243             }
   244         };
   245  
   246         let normalized = match normalize_record(
   247             &parsed,
   248             &work.source_name,
   249             &work.provider,
   250             source_file,
   251             inode,
   252             checkpoint.source_generation,
   253             line_no,
   254             start_offset,
   255             &session_hint,
   256             &model_hint,
   257         ) {
   258             Ok(normalized) => normalized,
   259             Err(exc) => {
   260                 batch.error_rows.push(json!({
   261                     "source_name": work.source_name,
   262                     "provider": work.provider,
   263                     "source_file": source_file,
   264                     "source_inode": inode,
   265                     "source_generation": checkpoint.source_generation,
   266                     "source_line_no": line_no,
   267                     "source_offset": start_offset,
   268                     "error_kind": "normalize_error",
   269                     "error_text": exc.to_string(),
   270                     "raw_fragment": truncate(&text, 20_000),
   271                 }));
   272                 continue;
   273             }
   274         };
   275  
   276         session_hint = normalized.session_hint;
   277         model_hint = normalized.model_hint;
   278         batch.raw_rows.push(normalized.raw_row);
   279         batch.event_rows.extend(normalized.event_rows);
   280         batch.link_rows.extend(normalized.link_rows);
   281         batch.tool_rows.extend(normalized.tool_rows);
   282         batch.error_rows.extend(normalized.error_rows);
   283         batch.lines_processed = batch.lines_processed.saturating_add(1);
   284  
   285         if batch.row_count() >= config.ingest.batch_size {
   286             let mut chunk = RowBatch::default();
   287             chunk.raw_rows = std::mem::take(&mut batch.raw_rows);
   288             chunk.event_rows = std::mem::take(&mut batch.event_rows);
   289             chunk.link_rows = std::mem::take(&mut batch.link_rows);
   290             chunk.tool_rows = std::mem::take(&mut batch.tool_rows);
   291             chunk.error_rows = std::mem::take(&mut batch.error_rows);
   292             chunk.lines_processed = batch.lines_processed;
   293             batch.lines_processed = 0;
   294             chunk.checkpoint = Some(Checkpoint {
   295                 source_name: work.source_name.clone(),
   296                 source_file: source_file.to_string(),
   297                 source_inode: inode,
   298                 source_generation: checkpoint.source_generation,
   299                 last_offset: offset,
   300                 last_line_no: line_no,
   301                 status: "active".to_string(),
   302             });
   303  
   304             sink_tx
   305                 .send(SinkMessage::Batch(chunk))
   306                 .await
   307                 .context("sink channel closed while sending chunk")?;
   308         }
   309     }
   310  
   311     let final_checkpoint = Checkpoint {
   312         source_name: work.source_name.clone(),
   313         source_file: source_file.to_string(),
   314         source_inode: inode,
   315         source_generation: checkpoint.source_generation,
   316         last_offset: offset,
   317         last_line_no: line_no,
   318         status: "active".to_string(),
   319     };
   320  
   321     if batch.row_count() > 0 || generation_changed || offset != checkpoint.last_offset {
   322         batch.checkpoint = Some(final_checkpoint);
   323         sink_tx
   324             .send(SinkMessage::Batch(batch))
   325             .await
   326             .context("sink channel closed while sending final batch")?;
   327     }
   328  
   329     if metrics.queue_depth.load(Ordering::Relaxed) == 0 {
   330         debug!(
   331             "{}:{} caught up at offset {}",
   332             work.source_name, source_file, offset
   333         );
   334     }
   335  
   336     Ok(())
   337 }
   338  
   339 fn source_inode_for_file(source_file: &str, meta: &std::fs::Metadata) -> u64 {
   340     #[cfg(unix)]
   341     {
   342         let _ = source_file;
   343         meta.ino()
   344     }
   345  
   346     #[cfg(not(unix))]
   347     {
   348         non_unix_source_inode(source_file, meta)
   349     }
   350 }
   351  
   352 #[cfg(not(unix))]
   353 fn non_unix_source_inode(source_file: &str, meta: &std::fs::Metadata) -> u64 {
   354     if let Ok(handle) = Handle::from_path(source_file) {
   355         let id = hash_identity(&handle);
   356         if id != 0 {
   357             return id;
   358         }
   359     }
   360  
   361     // Fallback when a platform file handle identity is unavailable.
   362     let mut hasher = std::collections::hash_map::DefaultHasher::new();
   363     source_file.hash(&mut hasher);
   364     if let Ok(created_at) = meta.created() {
   365         if let Ok(since_epoch) = created_at.duration_since(UNIX_EPOCH) {
   366             since_epoch.as_nanos().hash(&mut hasher);
   367         }
   368     }
   369  
   370     let id = hasher.finish();
   371     if id == 0 {
   372         1
   373     } else {
   374         id
   375     }
   376 }
   377  
   378 #[cfg(not(unix))]
   379 fn hash_identity(value: &impl Hash) -> u64 {
   380     let mut hasher = std::collections::hash_map::DefaultHasher::new();
   381     value.hash(&mut hasher);
   382     hasher.finish()
   383 }
   384  
   385 fn truncate(input: &str, max_chars: usize) -> String {
   386     if input.chars().count() <= max_chars {
   387         return input.to_string();
   388     }
   389     input.chars().take(max_chars).collect()
   390 }
   391  
   392 #[cfg(test)]
   393 mod tests {
   394     use super::{complete_work, source_inode_for_file};
   395     use crate::{DispatchState, WorkItem};
   396     use std::fs;
   397     use std::path::PathBuf;
   398     use std::sync::{Arc, Mutex};
   399     use std::time::{SystemTime, UNIX_EPOCH};
   400  
   401     fn sample_work(path: &str) -> WorkItem {
   402         WorkItem {
   403             source_name: "test-source".to_string(),
   404             provider: "test-provider".to_string(),
   405             path: path.to_string(),
   406         }
   407     }
   408  
   409     fn unique_test_file(name: &str) -> PathBuf {
   410         let suffix = SystemTime::now()
   411             .duration_since(UNIX_EPOCH)
   412             .expect("clock before unix epoch")
   413             .as_nanos();
   414         std::env::temp_dir().join(format!("moraine-dispatch-{name}-{suffix}.jsonl"))
   415     }
   416  
   417     #[test]
   418     fn complete_work_prunes_idle_item() {
   419         let dispatch = Arc::new(Mutex::new(DispatchState::default()));
   420         let work = sample_work("/tmp/idle.jsonl");
   421         let key = work.key();
   422  
   423         {
   424             let mut state = dispatch.lock().expect("dispatch mutex poisoned");
   425             state.inflight.insert(key.clone());
   426             state.item_by_key.insert(key.clone(), work);
   427         }
   428  
   429         let reschedule = complete_work(&key, &dispatch);
   430         assert!(reschedule.is_none());
   431  
   432         let state = dispatch.lock().expect("dispatch mutex poisoned");
   433         assert!(!state.inflight.contains(&key));
   434         assert!(!state.pending.contains(&key));
   435         assert!(!state.dirty.contains(&key));
   436         assert!(!state.item_by_key.contains_key(&key));
   437     }
   438  
   439     #[test]
   440     fn complete_work_reschedules_dirty_item() {
   441         let dispatch = Arc::new(Mutex::new(DispatchState::default()));
   442         let work = sample_work("/tmp/dirty.jsonl");
   443         let key = work.key();
   444  
   445         {
   446             let mut state = dispatch.lock().expect("dispatch mutex poisoned");
   447             state.inflight.insert(key.clone());
   448             state.dirty.insert(key.clone());
   449             state.item_by_key.insert(key.clone(), work.clone());
   450         }
   451  
   452         let reschedule = complete_work(&key, &dispatch);
   453         assert_eq!(
   454             reschedule.as_ref().map(|item| item.path.as_str()),
   455             Some(work.path.as_str())
   456         );
   457  
   458         let state = dispatch.lock().expect("dispatch mutex poisoned");
   459         assert!(!state.inflight.contains(&key));
   460         assert!(!state.dirty.contains(&key));
   461         assert!(state.pending.contains(&key));
   462         assert!(state.item_by_key.contains_key(&key));
   463     }
   464  
   465     #[test]
   466     fn complete_work_keeps_item_when_still_pending() {
   467         let dispatch = Arc::new(Mutex::new(DispatchState::default()));
   468         let work = sample_work("/tmp/pending.jsonl");
   469         let key = work.key();
   470  
   471         {
   472             let mut state = dispatch.lock().expect("dispatch mutex poisoned");
   473             state.pending.insert(key.clone());
   474             state.item_by_key.insert(key.clone(), work);
   475         }
   476  
   477         let reschedule = complete_work(&key, &dispatch);
   478         assert!(reschedule.is_none());
   479  
   480         let state = dispatch.lock().expect("dispatch mutex poisoned");
   481         assert!(state.pending.contains(&key));
   482         assert!(state.item_by_key.contains_key(&key));
   483     }
   484  
   485     #[test]
   486     fn source_inode_is_stable_for_same_file() {
   487         let path = unique_test_file("identity-stable");
   488         fs::write(&path, "{\"line\":1}\n").expect("write initial file");
   489         let source_file = path.to_string_lossy().to_string();
   490  
   491         let first_meta = fs::metadata(&path).expect("metadata for initial file");
   492         let first_id = source_inode_for_file(&source_file, &first_meta);
   493         assert_ne!(first_id, 0);
   494  
   495         fs::write(&path, "{\"line\":1}\n{\"line\":2}\n").expect("append file content");
   496         let second_meta = fs::metadata(&path).expect("metadata after append");
   497         let second_id = source_inode_for_file(&source_file, &second_meta);
   498  
   499         let _ = fs::remove_file(&path);
   500         assert_eq!(first_id, second_id);
   501     }
   502  
   503     #[test]
   504     fn source_inode_changes_when_file_is_replaced() {
   505         let path = unique_test_file("identity-replaced");
   506         let replacement = unique_test_file("identity-replacement");
   507         fs::write(&path, "{\"line\":1}\n").expect("write original file");
   508         let source_file = path.to_string_lossy().to_string();
   509  
   510         let original_meta = fs::metadata(&path).expect("metadata for original file");
   511         let original_id = source_inode_for_file(&source_file, &original_meta);
   512         assert_ne!(original_id, 0);
   513  
   514         fs::write(&replacement, "{\"line\":99}\n").expect("write replacement file");
   515         fs::rename(&replacement, &path).expect("replace file via rename");
   516  
   517         let replaced_meta = fs::metadata(&path).expect("metadata for replaced file");
   518         let replaced_id = source_inode_for_file(&source_file, &replaced_meta);
   519  
   520         let _ = fs::remove_file(&path);
   521         assert_ne!(original_id, replaced_id);
   522     }
   523 }