Skip to content

crates/moraine-ingest-core/src/reconcile.rs


     1 use crate::dispatch::enqueue_work;
     2 use crate::watch::enumerate_jsonl_files;
     3 use crate::{DispatchState, Metrics, WorkItem};
     4 use moraine_config::{AppConfig, IngestSource};
     5 use std::sync::{Arc, Mutex};
     6 use std::time::Duration;
     7 use tokio::sync::mpsc;
     8 use tokio::task::JoinHandle;
     9 use tracing::{debug, warn};
    10  
    11 pub(crate) fn spawn_reconcile_task(
    12     config: AppConfig,
    13     sources: Vec<IngestSource>,
    14     process_tx: mpsc::Sender<WorkItem>,
    15     dispatch: Arc<Mutex<DispatchState>>,
    16     metrics: Arc<Metrics>,
    17 ) -> JoinHandle<()> {
    18     tokio::spawn(async move {
    19         let interval = Duration::from_secs_f64(config.ingest.reconcile_interval_seconds.max(5.0));
    20         let mut ticker = tokio::time::interval(interval);
    21  
    22         loop {
    23             ticker.tick().await;
    24             for source in &sources {
    25                 match enumerate_jsonl_files(&source.glob) {
    26                     Ok(paths) => {
    27                         debug!(
    28                             "reconcile scanning {} files for source={}",
    29                             paths.len(),
    30                             source.name
    31                         );
    32                         for path in paths {
    33                             enqueue_work(
    34                                 WorkItem {
    35                                     source_name: source.name.clone(),
    36                                     provider: source.provider.clone(),
    37                                     path,
    38                                 },
    39                                 &process_tx,
    40                                 &dispatch,
    41                                 &metrics,
    42                             )
    43                             .await;
    44                         }
    45                     }
    46                     Err(exc) => {
    47                         warn!("reconcile scan failed for source={}: {exc}", source.name);
    48                     }
    49                 }
    50             }
    51         }
    52     })
    53 }