crates/moraine-ingest-core/src/reconcile.rs
1 use crate::dispatch::enqueue_work;
2 use crate::watch::enumerate_jsonl_files;
3 use crate::{DispatchState, Metrics, WorkItem};
4 use moraine_config::{AppConfig, IngestSource};
5 use std::sync::{Arc, Mutex};
6 use std::time::Duration;
7 use tokio::sync::mpsc;
8 use tokio::task::JoinHandle;
9 use tracing::{debug, warn};
10
11 pub(crate) fn spawn_reconcile_task(
12 config: AppConfig,
13 sources: Vec<IngestSource>,
14 process_tx: mpsc::Sender<WorkItem>,
15 dispatch: Arc<Mutex<DispatchState>>,
16 metrics: Arc<Metrics>,
17 ) -> JoinHandle<()> {
18 tokio::spawn(async move {
19 let interval = Duration::from_secs_f64(config.ingest.reconcile_interval_seconds.max(5.0));
20 let mut ticker = tokio::time::interval(interval);
21
22 loop {
23 ticker.tick().await;
24 for source in &sources {
25 match enumerate_jsonl_files(&source.glob) {
26 Ok(paths) => {
27 debug!(
28 "reconcile scanning {} files for source={}",
29 paths.len(),
30 source.name
31 );
32 for path in paths {
33 enqueue_work(
34 WorkItem {
35 source_name: source.name.clone(),
36 provider: source.provider.clone(),
37 path,
38 },
39 &process_tx,
40 &dispatch,
41 &metrics,
42 )
43 .await;
44 }
45 }
46 Err(exc) => {
47 warn!("reconcile scan failed for source={}: {exc}", source.name);
48 }
49 }
50 }
51 }
52 })
53 }