crates/moraine-ingest-core/src/dispatch.rs
1 use crate::checkpoint::checkpoint_key;
2 use crate::model::{Checkpoint, RowBatch};
3 use crate::normalize::normalize_record;
4 use crate::{DispatchState, Metrics, SinkMessage, WorkItem};
5 use anyhow::{Context, Result};
6 use moraine_config::AppConfig;
7 use serde_json::{json, Value};
8 use std::collections::HashMap;
9 #[cfg(not(unix))]
10 use std::hash::{Hash, Hasher};
11 use std::io::{BufRead, BufReader, Seek, SeekFrom};
12 use std::sync::atomic::Ordering;
13 use std::sync::{Arc, Mutex};
14 use std::time::{Duration, Instant};
15 use tokio::sync::{mpsc, RwLock};
16 use tokio::task::JoinHandle;
17 use tracing::debug;
18
19 #[cfg(not(unix))]
20 use same_file::Handle;
21 #[cfg(unix)]
22 use std::os::unix::fs::MetadataExt;
23 #[cfg(not(unix))]
24 use std::time::UNIX_EPOCH;
25
26 pub(crate) fn spawn_debounce_task(
27 config: AppConfig,
28 mut rx: mpsc::UnboundedReceiver<WorkItem>,
29 process_tx: mpsc::Sender<WorkItem>,
30 dispatch: Arc<Mutex<DispatchState>>,
31 metrics: Arc<Metrics>,
32 ) -> JoinHandle<()> {
33 tokio::spawn(async move {
34 let debounce = Duration::from_millis(config.ingest.debounce_ms.max(5));
35 let mut pending = HashMap::<String, (WorkItem, Instant)>::new();
36 let mut tick = tokio::time::interval(Duration::from_millis(
37 (config.ingest.debounce_ms / 2).max(10),
38 ));
39
40 loop {
41 tokio::select! {
42 maybe_work = rx.recv() => {
43 match maybe_work {
44 Some(work) => {
45 pending.insert(work.key(), (work, Instant::now()));
46 }
47 None => break,
48 }
49 }
50 _ = tick.tick() => {
51 if pending.is_empty() {
52 continue;
53 }
54
55 let now = Instant::now();
56 let ready: Vec<String> = pending
57 .iter()
58 .filter_map(|(key, (_, seen_at))| {
59 if now.duration_since(*seen_at) >= debounce {
60 Some(key.clone())
61 } else {
62 None
63 }
64 })
65 .collect();
66
67 for key in ready {
68 if let Some((work, _)) = pending.remove(&key) {
69 if !work.path.ends_with(".jsonl") {
70 continue;
71 }
72
73 enqueue_work(work, &process_tx, &dispatch, &metrics).await;
74 }
75 }
76 }
77 }
78 }
79 })
80 }
81
82 pub(crate) async fn enqueue_work(
83 work: WorkItem,
84 process_tx: &mpsc::Sender<WorkItem>,
85 dispatch: &Arc<Mutex<DispatchState>>,
86 metrics: &Arc<Metrics>,
87 ) {
88 if !work.path.ends_with(".jsonl") {
89 return;
90 }
91
92 let key = work.key();
93 let mut should_send = false;
94 {
95 let mut state = dispatch.lock().expect("dispatch mutex poisoned");
96 state.item_by_key.insert(key.clone(), work.clone());
97 if state.inflight.contains(&key) {
98 state.dirty.insert(key.clone());
99 } else if state.pending.insert(key.clone()) {
100 should_send = true;
101 }
102 }
103
104 if should_send {
105 if process_tx.send(work).await.is_ok() {
106 metrics.queue_depth.fetch_add(1, Ordering::Relaxed);
107 }
108 }
109 }
110
111 pub(crate) fn complete_work(key: &str, dispatch: &Arc<Mutex<DispatchState>>) -> Option<WorkItem> {
112 let mut state = dispatch.lock().expect("dispatch mutex poisoned");
113 state.inflight.remove(key);
114
115 if state.dirty.remove(key) {
116 if state.pending.insert(key.to_string()) {
117 return state.item_by_key.get(key).cloned();
118 }
119 return None;
120 }
121
122 if !state.pending.contains(key) && !state.inflight.contains(key) && !state.dirty.contains(key) {
123 state.item_by_key.remove(key);
124 }
125
126 None
127 }
128
129 pub(crate) async fn process_file(
130 config: &AppConfig,
131 work: &WorkItem,
132 checkpoints: Arc<RwLock<HashMap<String, Checkpoint>>>,
133 sink_tx: mpsc::Sender<SinkMessage>,
134 metrics: &Arc<Metrics>,
135 ) -> Result<()> {
136 let source_file = &work.path;
137
138 let meta = match std::fs::metadata(source_file) {
139 Ok(meta) => meta,
140 Err(exc) => {
141 debug!("metadata missing for {}: {}", source_file, exc);
142 return Ok(());
143 }
144 };
145
146 let inode = source_inode_for_file(source_file, &meta);
147
148 let file_size = meta.len();
149 let cp_key = checkpoint_key(&work.source_name, source_file);
150 let committed = { checkpoints.read().await.get(&cp_key).cloned() };
151
152 let mut checkpoint = committed.unwrap_or(Checkpoint {
153 source_name: work.source_name.clone(),
154 source_file: source_file.to_string(),
155 source_inode: inode,
156 source_generation: 1,
157 last_offset: 0,
158 last_line_no: 0,
159 status: "active".to_string(),
160 });
161
162 let mut generation_changed = false;
163 if checkpoint.source_inode != inode || file_size < checkpoint.last_offset {
164 checkpoint.source_inode = inode;
165 checkpoint.source_generation = checkpoint.source_generation.saturating_add(1).max(1);
166 checkpoint.last_offset = 0;
167 checkpoint.last_line_no = 0;
168 checkpoint.status = "active".to_string();
169 generation_changed = true;
170 }
171
172 if file_size == checkpoint.last_offset && !generation_changed {
173 return Ok(());
174 }
175
176 let mut file = std::fs::File::open(source_file)
177 .with_context(|| format!("failed to open {}", source_file))?;
178 file.seek(SeekFrom::Start(checkpoint.last_offset))
179 .with_context(|| format!("failed to seek {}", source_file))?;
180
181 let mut reader = BufReader::new(file);
182 let mut offset = checkpoint.last_offset;
183 let mut line_no = checkpoint.last_line_no;
184 let mut session_hint = String::new();
185 let mut model_hint = String::new();
186
187 let mut batch = RowBatch::default();
188
189 loop {
190 let start_offset = offset;
191 let mut buf = Vec::<u8>::new();
192 let bytes_read = reader
193 .read_until(b'\n', &mut buf)
194 .with_context(|| format!("failed reading {}", source_file))?;
195
196 if bytes_read == 0 {
197 break;
198 }
199
200 offset = offset.saturating_add(bytes_read as u64);
201 line_no = line_no.saturating_add(1);
202
203 let mut text = String::from_utf8_lossy(&buf).to_string();
204 if text.ends_with('\n') {
205 text.pop();
206 }
207
208 if text.trim().is_empty() {
209 continue;
210 }
211
212 let parsed: Value = match serde_json::from_str::<Value>(&text) {
213 Ok(value) if value.is_object() => value,
214 Ok(_) => {
215 batch.error_rows.push(json!({
216 "source_name": work.source_name,
217 "provider": work.provider,
218 "source_file": source_file,
219 "source_inode": inode,
220 "source_generation": checkpoint.source_generation,
221 "source_line_no": line_no,
222 "source_offset": start_offset,
223 "error_kind": "json_parse_error",
224 "error_text": "Expected JSON object",
225 "raw_fragment": truncate(&text, 20_000),
226 }));
227 continue;
228 }
229 Err(exc) => {
230 batch.error_rows.push(json!({
231 "source_name": work.source_name,
232 "provider": work.provider,
233 "source_file": source_file,
234 "source_inode": inode,
235 "source_generation": checkpoint.source_generation,
236 "source_line_no": line_no,
237 "source_offset": start_offset,
238 "error_kind": "json_parse_error",
239 "error_text": exc.to_string(),
240 "raw_fragment": truncate(&text, 20_000),
241 }));
242 continue;
243 }
244 };
245
246 let normalized = match normalize_record(
247 &parsed,
248 &work.source_name,
249 &work.provider,
250 source_file,
251 inode,
252 checkpoint.source_generation,
253 line_no,
254 start_offset,
255 &session_hint,
256 &model_hint,
257 ) {
258 Ok(normalized) => normalized,
259 Err(exc) => {
260 batch.error_rows.push(json!({
261 "source_name": work.source_name,
262 "provider": work.provider,
263 "source_file": source_file,
264 "source_inode": inode,
265 "source_generation": checkpoint.source_generation,
266 "source_line_no": line_no,
267 "source_offset": start_offset,
268 "error_kind": "normalize_error",
269 "error_text": exc.to_string(),
270 "raw_fragment": truncate(&text, 20_000),
271 }));
272 continue;
273 }
274 };
275
276 session_hint = normalized.session_hint;
277 model_hint = normalized.model_hint;
278 batch.raw_rows.push(normalized.raw_row);
279 batch.event_rows.extend(normalized.event_rows);
280 batch.link_rows.extend(normalized.link_rows);
281 batch.tool_rows.extend(normalized.tool_rows);
282 batch.error_rows.extend(normalized.error_rows);
283 batch.lines_processed = batch.lines_processed.saturating_add(1);
284
285 if batch.row_count() >= config.ingest.batch_size {
286 let mut chunk = RowBatch::default();
287 chunk.raw_rows = std::mem::take(&mut batch.raw_rows);
288 chunk.event_rows = std::mem::take(&mut batch.event_rows);
289 chunk.link_rows = std::mem::take(&mut batch.link_rows);
290 chunk.tool_rows = std::mem::take(&mut batch.tool_rows);
291 chunk.error_rows = std::mem::take(&mut batch.error_rows);
292 chunk.lines_processed = batch.lines_processed;
293 batch.lines_processed = 0;
294 chunk.checkpoint = Some(Checkpoint {
295 source_name: work.source_name.clone(),
296 source_file: source_file.to_string(),
297 source_inode: inode,
298 source_generation: checkpoint.source_generation,
299 last_offset: offset,
300 last_line_no: line_no,
301 status: "active".to_string(),
302 });
303
304 sink_tx
305 .send(SinkMessage::Batch(chunk))
306 .await
307 .context("sink channel closed while sending chunk")?;
308 }
309 }
310
311 let final_checkpoint = Checkpoint {
312 source_name: work.source_name.clone(),
313 source_file: source_file.to_string(),
314 source_inode: inode,
315 source_generation: checkpoint.source_generation,
316 last_offset: offset,
317 last_line_no: line_no,
318 status: "active".to_string(),
319 };
320
321 if batch.row_count() > 0 || generation_changed || offset != checkpoint.last_offset {
322 batch.checkpoint = Some(final_checkpoint);
323 sink_tx
324 .send(SinkMessage::Batch(batch))
325 .await
326 .context("sink channel closed while sending final batch")?;
327 }
328
329 if metrics.queue_depth.load(Ordering::Relaxed) == 0 {
330 debug!(
331 "{}:{} caught up at offset {}",
332 work.source_name, source_file, offset
333 );
334 }
335
336 Ok(())
337 }
338
339 fn source_inode_for_file(source_file: &str, meta: &std::fs::Metadata) -> u64 {
340 #[cfg(unix)]
341 {
342 let _ = source_file;
343 meta.ino()
344 }
345
346 #[cfg(not(unix))]
347 {
348 non_unix_source_inode(source_file, meta)
349 }
350 }
351
352 #[cfg(not(unix))]
353 fn non_unix_source_inode(source_file: &str, meta: &std::fs::Metadata) -> u64 {
354 if let Ok(handle) = Handle::from_path(source_file) {
355 let id = hash_identity(&handle);
356 if id != 0 {
357 return id;
358 }
359 }
360
361 // Fallback when a platform file handle identity is unavailable.
362 let mut hasher = std::collections::hash_map::DefaultHasher::new();
363 source_file.hash(&mut hasher);
364 if let Ok(created_at) = meta.created() {
365 if let Ok(since_epoch) = created_at.duration_since(UNIX_EPOCH) {
366 since_epoch.as_nanos().hash(&mut hasher);
367 }
368 }
369
370 let id = hasher.finish();
371 if id == 0 {
372 1
373 } else {
374 id
375 }
376 }
377
378 #[cfg(not(unix))]
379 fn hash_identity(value: &impl Hash) -> u64 {
380 let mut hasher = std::collections::hash_map::DefaultHasher::new();
381 value.hash(&mut hasher);
382 hasher.finish()
383 }
384
385 fn truncate(input: &str, max_chars: usize) -> String {
386 if input.chars().count() <= max_chars {
387 return input.to_string();
388 }
389 input.chars().take(max_chars).collect()
390 }
391
392 #[cfg(test)]
393 mod tests {
394 use super::{complete_work, source_inode_for_file};
395 use crate::{DispatchState, WorkItem};
396 use std::fs;
397 use std::path::PathBuf;
398 use std::sync::{Arc, Mutex};
399 use std::time::{SystemTime, UNIX_EPOCH};
400
401 fn sample_work(path: &str) -> WorkItem {
402 WorkItem {
403 source_name: "test-source".to_string(),
404 provider: "test-provider".to_string(),
405 path: path.to_string(),
406 }
407 }
408
409 fn unique_test_file(name: &str) -> PathBuf {
410 let suffix = SystemTime::now()
411 .duration_since(UNIX_EPOCH)
412 .expect("clock before unix epoch")
413 .as_nanos();
414 std::env::temp_dir().join(format!("moraine-dispatch-{name}-{suffix}.jsonl"))
415 }
416
417 #[test]
418 fn complete_work_prunes_idle_item() {
419 let dispatch = Arc::new(Mutex::new(DispatchState::default()));
420 let work = sample_work("/tmp/idle.jsonl");
421 let key = work.key();
422
423 {
424 let mut state = dispatch.lock().expect("dispatch mutex poisoned");
425 state.inflight.insert(key.clone());
426 state.item_by_key.insert(key.clone(), work);
427 }
428
429 let reschedule = complete_work(&key, &dispatch);
430 assert!(reschedule.is_none());
431
432 let state = dispatch.lock().expect("dispatch mutex poisoned");
433 assert!(!state.inflight.contains(&key));
434 assert!(!state.pending.contains(&key));
435 assert!(!state.dirty.contains(&key));
436 assert!(!state.item_by_key.contains_key(&key));
437 }
438
439 #[test]
440 fn complete_work_reschedules_dirty_item() {
441 let dispatch = Arc::new(Mutex::new(DispatchState::default()));
442 let work = sample_work("/tmp/dirty.jsonl");
443 let key = work.key();
444
445 {
446 let mut state = dispatch.lock().expect("dispatch mutex poisoned");
447 state.inflight.insert(key.clone());
448 state.dirty.insert(key.clone());
449 state.item_by_key.insert(key.clone(), work.clone());
450 }
451
452 let reschedule = complete_work(&key, &dispatch);
453 assert_eq!(
454 reschedule.as_ref().map(|item| item.path.as_str()),
455 Some(work.path.as_str())
456 );
457
458 let state = dispatch.lock().expect("dispatch mutex poisoned");
459 assert!(!state.inflight.contains(&key));
460 assert!(!state.dirty.contains(&key));
461 assert!(state.pending.contains(&key));
462 assert!(state.item_by_key.contains_key(&key));
463 }
464
465 #[test]
466 fn complete_work_keeps_item_when_still_pending() {
467 let dispatch = Arc::new(Mutex::new(DispatchState::default()));
468 let work = sample_work("/tmp/pending.jsonl");
469 let key = work.key();
470
471 {
472 let mut state = dispatch.lock().expect("dispatch mutex poisoned");
473 state.pending.insert(key.clone());
474 state.item_by_key.insert(key.clone(), work);
475 }
476
477 let reschedule = complete_work(&key, &dispatch);
478 assert!(reschedule.is_none());
479
480 let state = dispatch.lock().expect("dispatch mutex poisoned");
481 assert!(state.pending.contains(&key));
482 assert!(state.item_by_key.contains_key(&key));
483 }
484
485 #[test]
486 fn source_inode_is_stable_for_same_file() {
487 let path = unique_test_file("identity-stable");
488 fs::write(&path, "{\"line\":1}\n").expect("write initial file");
489 let source_file = path.to_string_lossy().to_string();
490
491 let first_meta = fs::metadata(&path).expect("metadata for initial file");
492 let first_id = source_inode_for_file(&source_file, &first_meta);
493 assert_ne!(first_id, 0);
494
495 fs::write(&path, "{\"line\":1}\n{\"line\":2}\n").expect("append file content");
496 let second_meta = fs::metadata(&path).expect("metadata after append");
497 let second_id = source_inode_for_file(&source_file, &second_meta);
498
499 let _ = fs::remove_file(&path);
500 assert_eq!(first_id, second_id);
501 }
502
503 #[test]
504 fn source_inode_changes_when_file_is_replaced() {
505 let path = unique_test_file("identity-replaced");
506 let replacement = unique_test_file("identity-replacement");
507 fs::write(&path, "{\"line\":1}\n").expect("write original file");
508 let source_file = path.to_string_lossy().to_string();
509
510 let original_meta = fs::metadata(&path).expect("metadata for original file");
511 let original_id = source_inode_for_file(&source_file, &original_meta);
512 assert_ne!(original_id, 0);
513
514 fs::write(&replacement, "{\"line\":99}\n").expect("write replacement file");
515 fs::rename(&replacement, &path).expect("replace file via rename");
516
517 let replaced_meta = fs::metadata(&path).expect("metadata for replaced file");
518 let replaced_id = source_inode_for_file(&source_file, &replaced_meta);
519
520 let _ = fs::remove_file(&path);
521 assert_ne!(original_id, replaced_id);
522 }
523 }