crates/moraine-ingest-core/src/watch.rs
1 use crate::{
2 Metrics, WorkItem, WATCHER_BACKEND_MIXED, WATCHER_BACKEND_NATIVE, WATCHER_BACKEND_POLL,
3 WATCHER_BACKEND_UNKNOWN,
4 };
5 use anyhow::{Context, Result};
6 use glob::glob;
7 use moraine_config::IngestSource;
8 use notify::{
9 event::{EventKind, ModifyKind},
10 Config as NotifyConfig, Event, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
11 };
12 use std::collections::BTreeSet;
13 use std::sync::atomic::Ordering;
14 use std::sync::Arc;
15 use std::time::{Duration, SystemTime, UNIX_EPOCH};
16 use tokio::sync::mpsc;
17 use tracing::{info, warn};
18
19 enum ActiveWatcher {
20 Recommended(RecommendedWatcher),
21 Poll(PollWatcher),
22 }
23
24 impl ActiveWatcher {
25 fn watch(&mut self, path: &std::path::Path, mode: RecursiveMode) -> notify::Result<()> {
26 match self {
27 Self::Recommended(watcher) => watcher.watch(path, mode),
28 Self::Poll(watcher) => watcher.watch(path, mode),
29 }
30 }
31 }
32
33 #[derive(Clone, Copy)]
34 enum WatcherBackend {
35 Native,
36 Poll,
37 }
38
39 impl WatcherBackend {
40 fn state(self) -> u64 {
41 match self {
42 Self::Native => WATCHER_BACKEND_NATIVE,
43 Self::Poll => WATCHER_BACKEND_POLL,
44 }
45 }
46 }
47
48 fn unix_ms_now() -> u64 {
49 SystemTime::now()
50 .duration_since(UNIX_EPOCH)
51 .unwrap_or_default()
52 .as_millis() as u64
53 }
54
55 struct WatchRegistration {
56 metrics: Arc<Metrics>,
57 registered: bool,
58 }
59
60 impl WatchRegistration {
61 fn new(metrics: Arc<Metrics>) -> Self {
62 Self {
63 metrics,
64 registered: false,
65 }
66 }
67
68 fn mark_registered(&mut self) {
69 if self.registered {
70 return;
71 }
72 self.metrics
73 .watcher_registrations
74 .fetch_add(1, Ordering::Relaxed);
75 self.registered = true;
76 }
77 }
78
79 impl Drop for WatchRegistration {
80 fn drop(&mut self) {
81 if !self.registered {
82 return;
83 }
84 self.metrics
85 .watcher_registrations
86 .fetch_sub(1, Ordering::Relaxed);
87 }
88 }
89
90 fn record_backend(metrics: &Arc<Metrics>, backend: WatcherBackend) {
91 let next = backend.state();
92 let mut current = metrics.watcher_backend_state.load(Ordering::Relaxed);
93
94 loop {
95 let merged = match (current, next) {
96 (WATCHER_BACKEND_UNKNOWN, value) => value,
97 (value, next_value) if value == next_value => value,
98 _ => WATCHER_BACKEND_MIXED,
99 };
100
101 match metrics.watcher_backend_state.compare_exchange(
102 current,
103 merged,
104 Ordering::Relaxed,
105 Ordering::Relaxed,
106 ) {
107 Ok(_) => return,
108 Err(observed) => current = observed,
109 }
110 }
111 }
112
113 fn record_watcher_error(metrics: &Arc<Metrics>, message: &str) {
114 metrics.watcher_error_count.fetch_add(1, Ordering::Relaxed);
115 *metrics
116 .last_error
117 .lock()
118 .expect("metrics last_error mutex poisoned") = message.to_string();
119 }
120
121 fn record_rescan(metrics: &Arc<Metrics>) {
122 metrics.watcher_reset_count.fetch_add(1, Ordering::Relaxed);
123 metrics
124 .watcher_last_reset_unix_ms
125 .store(unix_ms_now(), Ordering::Relaxed);
126 }
127
128 fn event_requires_rescan(event: &Event) -> bool {
129 event.paths.is_empty() || event.need_rescan()
130 }
131
132 fn event_is_relevant(kind: &EventKind) -> bool {
133 match kind {
134 EventKind::Any | EventKind::Create(_) => true,
135 EventKind::Modify(modify_kind) => matches!(
136 modify_kind,
137 ModifyKind::Any | ModifyKind::Data(_) | ModifyKind::Name(_)
138 ),
139 _ => false,
140 }
141 }
142
143 fn event_jsonl_paths(event: &Event) -> Vec<String> {
144 let mut dedup = BTreeSet::<String>::new();
145 for path in &event.paths {
146 if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
147 dedup.insert(path.to_string_lossy().to_string());
148 }
149 }
150 dedup.into_iter().collect()
151 }
152
153 fn queue_rescan(
154 glob_pattern: &str,
155 source_name: &str,
156 provider: &str,
157 tx: &mpsc::UnboundedSender<WorkItem>,
158 metrics: &Arc<Metrics>,
159 ) {
160 record_rescan(metrics);
161 match enumerate_jsonl_files(glob_pattern) {
162 Ok(paths) => {
163 for path in paths {
164 let _ = tx.send(WorkItem {
165 source_name: source_name.to_string(),
166 provider: provider.to_string(),
167 path,
168 });
169 }
170 }
171 Err(exc) => {
172 warn!(
173 source = source_name,
174 provider,
175 glob_pattern,
176 error = %exc,
177 "watcher rescan failed to enumerate jsonl files"
178 );
179 record_watcher_error(
180 metrics,
181 &format!(
182 "rescan enumerate failed for source={source_name} provider={provider} glob={glob_pattern}: {exc}"
183 ),
184 );
185 }
186 }
187 }
188
189 pub(crate) fn spawn_watcher_threads(
190 sources: Vec<IngestSource>,
191 tx: mpsc::UnboundedSender<WorkItem>,
192 metrics: Arc<Metrics>,
193 ) -> Result<Vec<std::thread::JoinHandle<()>>> {
194 let mut handles = Vec::<std::thread::JoinHandle<()>>::new();
195
196 for source in sources {
197 let source_name = source.name.clone();
198 let provider = source.provider.clone();
199 let glob_pattern = source.glob.clone();
200 let watch_root = std::path::PathBuf::from(source.watch_root.clone());
201 let tx_clone = tx.clone();
202 let metrics_clone = metrics.clone();
203
204 info!(
205 "starting watcher on {} (source={}, provider={})",
206 watch_root.display(),
207 source_name,
208 provider
209 );
210
211 let handle = std::thread::spawn(move || {
212 let (event_tx, event_rx) = std::sync::mpsc::channel::<notify::Result<Event>>();
213 let native_tx = event_tx.clone();
214 let mut registration = WatchRegistration::new(metrics_clone.clone());
215
216 let mut watcher = match notify::recommended_watcher(move |res| {
217 let _ = native_tx.send(res);
218 }) {
219 Ok(watcher) => {
220 record_backend(&metrics_clone, WatcherBackend::Native);
221 info!("watcher backend native (source={})", source_name);
222 ActiveWatcher::Recommended(watcher)
223 }
224 Err(exc) => {
225 eprintln!(
226 "[moraine-rust] failed to create native watcher for {}: {exc}; falling back to poll watcher",
227 source_name
228 );
229 record_watcher_error(
230 &metrics_clone,
231 &format!("native watcher create failed for {}: {exc}", source_name),
232 );
233 let poll_config =
234 NotifyConfig::default().with_poll_interval(Duration::from_secs(2));
235 match PollWatcher::new(
236 move |res| {
237 let _ = event_tx.send(res);
238 },
239 poll_config,
240 ) {
241 Ok(watcher) => {
242 record_backend(&metrics_clone, WatcherBackend::Poll);
243 info!("watcher backend poll (source={})", source_name);
244 ActiveWatcher::Poll(watcher)
245 }
246 Err(poll_exc) => {
247 eprintln!(
248 "[moraine-rust] failed to create poll watcher for {}: {poll_exc}",
249 source_name
250 );
251 record_watcher_error(
252 &metrics_clone,
253 &format!(
254 "poll watcher create failed for {}: {poll_exc}",
255 source_name
256 ),
257 );
258 queue_rescan(
259 &glob_pattern,
260 &source_name,
261 &provider,
262 &tx_clone,
263 &metrics_clone,
264 );
265 return;
266 }
267 }
268 }
269 };
270
271 if let Err(exc) = watcher.watch(watch_root.as_path(), RecursiveMode::Recursive) {
272 eprintln!(
273 "[moraine-rust] failed to watch {} ({}): {exc}",
274 watch_root.display(),
275 source_name
276 );
277 record_watcher_error(
278 &metrics_clone,
279 &format!(
280 "watch root register failed for {}:{}: {exc}",
281 source_name,
282 watch_root.display()
283 ),
284 );
285 queue_rescan(
286 &glob_pattern,
287 &source_name,
288 &provider,
289 &tx_clone,
290 &metrics_clone,
291 );
292 return;
293 }
294 registration.mark_registered();
295
296 loop {
297 match event_rx.recv() {
298 Ok(Ok(event)) => {
299 if event_requires_rescan(&event) {
300 queue_rescan(
301 &glob_pattern,
302 &source_name,
303 &provider,
304 &tx_clone,
305 &metrics_clone,
306 );
307 continue;
308 }
309
310 if !event_is_relevant(&event.kind) {
311 continue;
312 }
313
314 for path in event_jsonl_paths(&event) {
315 let _ = tx_clone.send(WorkItem {
316 source_name: source_name.clone(),
317 provider: provider.clone(),
318 path,
319 });
320 }
321 }
322 Ok(Err(exc)) => {
323 eprintln!("[moraine-rust] watcher event error ({source_name}): {exc}");
324 record_watcher_error(
325 &metrics_clone,
326 &format!("watcher event error ({source_name}): {exc}"),
327 );
328 queue_rescan(
329 &glob_pattern,
330 &source_name,
331 &provider,
332 &tx_clone,
333 &metrics_clone,
334 );
335 }
336 Err(_) => break,
337 }
338 }
339 });
340
341 handles.push(handle);
342 }
343
344 Ok(handles)
345 }
346
347 pub(crate) fn enumerate_jsonl_files(glob_pattern: &str) -> Result<Vec<String>> {
348 let mut files = Vec::<String>::new();
349 for entry in glob(glob_pattern).with_context(|| format!("invalid glob: {}", glob_pattern))? {
350 let path = match entry {
351 Ok(path) => path,
352 Err(exc) => {
353 warn!("glob iteration error: {exc}");
354 continue;
355 }
356 };
357
358 if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
359 files.push(path.to_string_lossy().to_string());
360 }
361 }
362 files.sort();
363 Ok(files)
364 }
365
366 #[cfg(test)]
367 mod tests {
368 use super::*;
369 use crate::Metrics;
370 use notify::{
371 event::{CreateKind, DataChange, Flag, ModifyKind, RenameMode},
372 EventKind,
373 };
374 use std::path::PathBuf;
375 use std::sync::atomic::Ordering;
376 use tokio::sync::mpsc;
377
378 #[test]
379 fn rescan_events_require_reconcile() {
380 let event = Event::new(EventKind::Other).set_flag(Flag::Rescan);
381 assert!(event_requires_rescan(&event));
382 }
383
384 #[test]
385 fn relevant_event_kinds_include_create_modify_data_and_rename() {
386 assert!(event_is_relevant(&EventKind::Create(CreateKind::Any)));
387 assert!(event_is_relevant(&EventKind::Modify(ModifyKind::Data(
388 DataChange::Any
389 ))));
390 assert!(event_is_relevant(&EventKind::Modify(ModifyKind::Name(
391 RenameMode::Any
392 ))));
393 assert!(!event_is_relevant(&EventKind::Remove(
394 notify::event::RemoveKind::Any
395 )));
396 }
397
398 #[test]
399 fn jsonl_paths_are_deduped_and_filtered() {
400 let mut event = Event::new(EventKind::Modify(ModifyKind::Data(DataChange::Any)));
401 event.paths = vec![
402 PathBuf::from("/tmp/a.jsonl"),
403 PathBuf::from("/tmp/a.jsonl"),
404 PathBuf::from("/tmp/b.txt"),
405 ];
406
407 let paths = event_jsonl_paths(&event);
408 assert_eq!(paths, vec!["/tmp/a.jsonl".to_string()]);
409 }
410
411 #[test]
412 fn watcher_registration_tracks_active_watches() {
413 let metrics = Arc::new(Metrics::default());
414 assert_eq!(metrics.watcher_registrations.load(Ordering::Relaxed), 0);
415
416 {
417 let mut registration = WatchRegistration::new(metrics.clone());
418 registration.mark_registered();
419 assert_eq!(metrics.watcher_registrations.load(Ordering::Relaxed), 1);
420 }
421
422 assert_eq!(metrics.watcher_registrations.load(Ordering::Relaxed), 0);
423 }
424
425 #[test]
426 fn queue_rescan_records_enumeration_errors() {
427 let metrics = Arc::new(Metrics::default());
428 let (tx, mut rx) = mpsc::unbounded_channel();
429
430 queue_rescan("[", "source-alpha", "provider-alpha", &tx, &metrics);
431
432 assert!(rx.try_recv().is_err());
433 assert_eq!(metrics.watcher_reset_count.load(Ordering::Relaxed), 1);
434 assert_eq!(metrics.watcher_error_count.load(Ordering::Relaxed), 1);
435
436 let last_error = metrics
437 .last_error
438 .lock()
439 .expect("metrics last_error mutex poisoned")
440 .clone();
441 assert!(last_error.contains("rescan enumerate failed"));
442 assert!(last_error.contains("source=source-alpha"));
443 assert!(last_error.contains("provider=provider-alpha"));
444 assert!(last_error.contains("glob=["));
445 }
446 }