Skip to content

crates/moraine-ingest-core/src/watch.rs


     1 use crate::{
     2     Metrics, WorkItem, WATCHER_BACKEND_MIXED, WATCHER_BACKEND_NATIVE, WATCHER_BACKEND_POLL,
     3     WATCHER_BACKEND_UNKNOWN,
     4 };
     5 use anyhow::{Context, Result};
     6 use glob::glob;
     7 use moraine_config::IngestSource;
     8 use notify::{
     9     event::{EventKind, ModifyKind},
    10     Config as NotifyConfig, Event, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
    11 };
    12 use std::collections::BTreeSet;
    13 use std::sync::atomic::Ordering;
    14 use std::sync::Arc;
    15 use std::time::{Duration, SystemTime, UNIX_EPOCH};
    16 use tokio::sync::mpsc;
    17 use tracing::{info, warn};
    18  
    19 enum ActiveWatcher {
    20     Recommended(RecommendedWatcher),
    21     Poll(PollWatcher),
    22 }
    23  
    24 impl ActiveWatcher {
    25     fn watch(&mut self, path: &std::path::Path, mode: RecursiveMode) -> notify::Result<()> {
    26         match self {
    27             Self::Recommended(watcher) => watcher.watch(path, mode),
    28             Self::Poll(watcher) => watcher.watch(path, mode),
    29         }
    30     }
    31 }
    32  
    33 #[derive(Clone, Copy)]
    34 enum WatcherBackend {
    35     Native,
    36     Poll,
    37 }
    38  
    39 impl WatcherBackend {
    40     fn state(self) -> u64 {
    41         match self {
    42             Self::Native => WATCHER_BACKEND_NATIVE,
    43             Self::Poll => WATCHER_BACKEND_POLL,
    44         }
    45     }
    46 }
    47  
    48 fn unix_ms_now() -> u64 {
    49     SystemTime::now()
    50         .duration_since(UNIX_EPOCH)
    51         .unwrap_or_default()
    52         .as_millis() as u64
    53 }
    54  
    55 struct WatchRegistration {
    56     metrics: Arc<Metrics>,
    57     registered: bool,
    58 }
    59  
    60 impl WatchRegistration {
    61     fn new(metrics: Arc<Metrics>) -> Self {
    62         Self {
    63             metrics,
    64             registered: false,
    65         }
    66     }
    67  
    68     fn mark_registered(&mut self) {
    69         if self.registered {
    70             return;
    71         }
    72         self.metrics
    73             .watcher_registrations
    74             .fetch_add(1, Ordering::Relaxed);
    75         self.registered = true;
    76     }
    77 }
    78  
    79 impl Drop for WatchRegistration {
    80     fn drop(&mut self) {
    81         if !self.registered {
    82             return;
    83         }
    84         self.metrics
    85             .watcher_registrations
    86             .fetch_sub(1, Ordering::Relaxed);
    87     }
    88 }
    89  
    90 fn record_backend(metrics: &Arc<Metrics>, backend: WatcherBackend) {
    91     let next = backend.state();
    92     let mut current = metrics.watcher_backend_state.load(Ordering::Relaxed);
    93  
    94     loop {
    95         let merged = match (current, next) {
    96             (WATCHER_BACKEND_UNKNOWN, value) => value,
    97             (value, next_value) if value == next_value => value,
    98             _ => WATCHER_BACKEND_MIXED,
    99         };
   100  
   101         match metrics.watcher_backend_state.compare_exchange(
   102             current,
   103             merged,
   104             Ordering::Relaxed,
   105             Ordering::Relaxed,
   106         ) {
   107             Ok(_) => return,
   108             Err(observed) => current = observed,
   109         }
   110     }
   111 }
   112  
   113 fn record_watcher_error(metrics: &Arc<Metrics>, message: &str) {
   114     metrics.watcher_error_count.fetch_add(1, Ordering::Relaxed);
   115     *metrics
   116         .last_error
   117         .lock()
   118         .expect("metrics last_error mutex poisoned") = message.to_string();
   119 }
   120  
   121 fn record_rescan(metrics: &Arc<Metrics>) {
   122     metrics.watcher_reset_count.fetch_add(1, Ordering::Relaxed);
   123     metrics
   124         .watcher_last_reset_unix_ms
   125         .store(unix_ms_now(), Ordering::Relaxed);
   126 }
   127  
   128 fn event_requires_rescan(event: &Event) -> bool {
   129     event.paths.is_empty() || event.need_rescan()
   130 }
   131  
   132 fn event_is_relevant(kind: &EventKind) -> bool {
   133     match kind {
   134         EventKind::Any | EventKind::Create(_) => true,
   135         EventKind::Modify(modify_kind) => matches!(
   136             modify_kind,
   137             ModifyKind::Any | ModifyKind::Data(_) | ModifyKind::Name(_)
   138         ),
   139         _ => false,
   140     }
   141 }
   142  
   143 fn event_jsonl_paths(event: &Event) -> Vec<String> {
   144     let mut dedup = BTreeSet::<String>::new();
   145     for path in &event.paths {
   146         if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
   147             dedup.insert(path.to_string_lossy().to_string());
   148         }
   149     }
   150     dedup.into_iter().collect()
   151 }
   152  
   153 fn queue_rescan(
   154     glob_pattern: &str,
   155     source_name: &str,
   156     provider: &str,
   157     tx: &mpsc::UnboundedSender<WorkItem>,
   158     metrics: &Arc<Metrics>,
   159 ) {
   160     record_rescan(metrics);
   161     match enumerate_jsonl_files(glob_pattern) {
   162         Ok(paths) => {
   163             for path in paths {
   164                 let _ = tx.send(WorkItem {
   165                     source_name: source_name.to_string(),
   166                     provider: provider.to_string(),
   167                     path,
   168                 });
   169             }
   170         }
   171         Err(exc) => {
   172             warn!(
   173                 source = source_name,
   174                 provider,
   175                 glob_pattern,
   176                 error = %exc,
   177                 "watcher rescan failed to enumerate jsonl files"
   178             );
   179             record_watcher_error(
   180                 metrics,
   181                 &format!(
   182                     "rescan enumerate failed for source={source_name} provider={provider} glob={glob_pattern}: {exc}"
   183                 ),
   184             );
   185         }
   186     }
   187 }
   188  
   189 pub(crate) fn spawn_watcher_threads(
   190     sources: Vec<IngestSource>,
   191     tx: mpsc::UnboundedSender<WorkItem>,
   192     metrics: Arc<Metrics>,
   193 ) -> Result<Vec<std::thread::JoinHandle<()>>> {
   194     let mut handles = Vec::<std::thread::JoinHandle<()>>::new();
   195  
   196     for source in sources {
   197         let source_name = source.name.clone();
   198         let provider = source.provider.clone();
   199         let glob_pattern = source.glob.clone();
   200         let watch_root = std::path::PathBuf::from(source.watch_root.clone());
   201         let tx_clone = tx.clone();
   202         let metrics_clone = metrics.clone();
   203  
   204         info!(
   205             "starting watcher on {} (source={}, provider={})",
   206             watch_root.display(),
   207             source_name,
   208             provider
   209         );
   210  
   211         let handle = std::thread::spawn(move || {
   212             let (event_tx, event_rx) = std::sync::mpsc::channel::<notify::Result<Event>>();
   213             let native_tx = event_tx.clone();
   214             let mut registration = WatchRegistration::new(metrics_clone.clone());
   215  
   216             let mut watcher = match notify::recommended_watcher(move |res| {
   217                 let _ = native_tx.send(res);
   218             }) {
   219                 Ok(watcher) => {
   220                     record_backend(&metrics_clone, WatcherBackend::Native);
   221                     info!("watcher backend native (source={})", source_name);
   222                     ActiveWatcher::Recommended(watcher)
   223                 }
   224                 Err(exc) => {
   225                     eprintln!(
   226                         "[moraine-rust] failed to create native watcher for {}: {exc}; falling back to poll watcher",
   227                         source_name
   228                     );
   229                     record_watcher_error(
   230                         &metrics_clone,
   231                         &format!("native watcher create failed for {}: {exc}", source_name),
   232                     );
   233                     let poll_config =
   234                         NotifyConfig::default().with_poll_interval(Duration::from_secs(2));
   235                     match PollWatcher::new(
   236                         move |res| {
   237                             let _ = event_tx.send(res);
   238                         },
   239                         poll_config,
   240                     ) {
   241                         Ok(watcher) => {
   242                             record_backend(&metrics_clone, WatcherBackend::Poll);
   243                             info!("watcher backend poll (source={})", source_name);
   244                             ActiveWatcher::Poll(watcher)
   245                         }
   246                         Err(poll_exc) => {
   247                             eprintln!(
   248                                 "[moraine-rust] failed to create poll watcher for {}: {poll_exc}",
   249                                 source_name
   250                             );
   251                             record_watcher_error(
   252                                 &metrics_clone,
   253                                 &format!(
   254                                     "poll watcher create failed for {}: {poll_exc}",
   255                                     source_name
   256                                 ),
   257                             );
   258                             queue_rescan(
   259                                 &glob_pattern,
   260                                 &source_name,
   261                                 &provider,
   262                                 &tx_clone,
   263                                 &metrics_clone,
   264                             );
   265                             return;
   266                         }
   267                     }
   268                 }
   269             };
   270  
   271             if let Err(exc) = watcher.watch(watch_root.as_path(), RecursiveMode::Recursive) {
   272                 eprintln!(
   273                     "[moraine-rust] failed to watch {} ({}): {exc}",
   274                     watch_root.display(),
   275                     source_name
   276                 );
   277                 record_watcher_error(
   278                     &metrics_clone,
   279                     &format!(
   280                         "watch root register failed for {}:{}: {exc}",
   281                         source_name,
   282                         watch_root.display()
   283                     ),
   284                 );
   285                 queue_rescan(
   286                     &glob_pattern,
   287                     &source_name,
   288                     &provider,
   289                     &tx_clone,
   290                     &metrics_clone,
   291                 );
   292                 return;
   293             }
   294             registration.mark_registered();
   295  
   296             loop {
   297                 match event_rx.recv() {
   298                     Ok(Ok(event)) => {
   299                         if event_requires_rescan(&event) {
   300                             queue_rescan(
   301                                 &glob_pattern,
   302                                 &source_name,
   303                                 &provider,
   304                                 &tx_clone,
   305                                 &metrics_clone,
   306                             );
   307                             continue;
   308                         }
   309  
   310                         if !event_is_relevant(&event.kind) {
   311                             continue;
   312                         }
   313  
   314                         for path in event_jsonl_paths(&event) {
   315                             let _ = tx_clone.send(WorkItem {
   316                                 source_name: source_name.clone(),
   317                                 provider: provider.clone(),
   318                                 path,
   319                             });
   320                         }
   321                     }
   322                     Ok(Err(exc)) => {
   323                         eprintln!("[moraine-rust] watcher event error ({source_name}): {exc}");
   324                         record_watcher_error(
   325                             &metrics_clone,
   326                             &format!("watcher event error ({source_name}): {exc}"),
   327                         );
   328                         queue_rescan(
   329                             &glob_pattern,
   330                             &source_name,
   331                             &provider,
   332                             &tx_clone,
   333                             &metrics_clone,
   334                         );
   335                     }
   336                     Err(_) => break,
   337                 }
   338             }
   339         });
   340  
   341         handles.push(handle);
   342     }
   343  
   344     Ok(handles)
   345 }
   346  
   347 pub(crate) fn enumerate_jsonl_files(glob_pattern: &str) -> Result<Vec<String>> {
   348     let mut files = Vec::<String>::new();
   349     for entry in glob(glob_pattern).with_context(|| format!("invalid glob: {}", glob_pattern))? {
   350         let path = match entry {
   351             Ok(path) => path,
   352             Err(exc) => {
   353                 warn!("glob iteration error: {exc}");
   354                 continue;
   355             }
   356         };
   357  
   358         if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
   359             files.push(path.to_string_lossy().to_string());
   360         }
   361     }
   362     files.sort();
   363     Ok(files)
   364 }
   365  
   366 #[cfg(test)]
   367 mod tests {
   368     use super::*;
   369     use crate::Metrics;
   370     use notify::{
   371         event::{CreateKind, DataChange, Flag, ModifyKind, RenameMode},
   372         EventKind,
   373     };
   374     use std::path::PathBuf;
   375     use std::sync::atomic::Ordering;
   376     use tokio::sync::mpsc;
   377  
   378     #[test]
   379     fn rescan_events_require_reconcile() {
   380         let event = Event::new(EventKind::Other).set_flag(Flag::Rescan);
   381         assert!(event_requires_rescan(&event));
   382     }
   383  
   384     #[test]
   385     fn relevant_event_kinds_include_create_modify_data_and_rename() {
   386         assert!(event_is_relevant(&EventKind::Create(CreateKind::Any)));
   387         assert!(event_is_relevant(&EventKind::Modify(ModifyKind::Data(
   388             DataChange::Any
   389         ))));
   390         assert!(event_is_relevant(&EventKind::Modify(ModifyKind::Name(
   391             RenameMode::Any
   392         ))));
   393         assert!(!event_is_relevant(&EventKind::Remove(
   394             notify::event::RemoveKind::Any
   395         )));
   396     }
   397  
   398     #[test]
   399     fn jsonl_paths_are_deduped_and_filtered() {
   400         let mut event = Event::new(EventKind::Modify(ModifyKind::Data(DataChange::Any)));
   401         event.paths = vec![
   402             PathBuf::from("/tmp/a.jsonl"),
   403             PathBuf::from("/tmp/a.jsonl"),
   404             PathBuf::from("/tmp/b.txt"),
   405         ];
   406  
   407         let paths = event_jsonl_paths(&event);
   408         assert_eq!(paths, vec!["/tmp/a.jsonl".to_string()]);
   409     }
   410  
   411     #[test]
   412     fn watcher_registration_tracks_active_watches() {
   413         let metrics = Arc::new(Metrics::default());
   414         assert_eq!(metrics.watcher_registrations.load(Ordering::Relaxed), 0);
   415  
   416         {
   417             let mut registration = WatchRegistration::new(metrics.clone());
   418             registration.mark_registered();
   419             assert_eq!(metrics.watcher_registrations.load(Ordering::Relaxed), 1);
   420         }
   421  
   422         assert_eq!(metrics.watcher_registrations.load(Ordering::Relaxed), 0);
   423     }
   424  
   425     #[test]
   426     fn queue_rescan_records_enumeration_errors() {
   427         let metrics = Arc::new(Metrics::default());
   428         let (tx, mut rx) = mpsc::unbounded_channel();
   429  
   430         queue_rescan("[", "source-alpha", "provider-alpha", &tx, &metrics);
   431  
   432         assert!(rx.try_recv().is_err());
   433         assert_eq!(metrics.watcher_reset_count.load(Ordering::Relaxed), 1);
   434         assert_eq!(metrics.watcher_error_count.load(Ordering::Relaxed), 1);
   435  
   436         let last_error = metrics
   437             .last_error
   438             .lock()
   439             .expect("metrics last_error mutex poisoned")
   440             .clone();
   441         assert!(last_error.contains("rescan enumerate failed"));
   442         assert!(last_error.contains("source=source-alpha"));
   443         assert!(last_error.contains("provider=provider-alpha"));
   444         assert!(last_error.contains("glob=["));
   445     }
   446 }