Skip to content

crates/moraine-ingest-core/src/lib.rs


     1 mod checkpoint;
     2 mod dispatch;
     3 mod heartbeat;
     4 pub mod model;
     5 pub mod normalize;
     6 mod reconcile;
     7 mod sink;
     8 mod watch;
     9  
    10 use crate::checkpoint::checkpoint_key;
    11 use crate::dispatch::{complete_work, enqueue_work, process_file, spawn_debounce_task};
    12 use crate::model::RowBatch;
    13 use crate::reconcile::spawn_reconcile_task;
    14 use crate::sink::spawn_sink_task;
    15 use crate::watch::{enumerate_jsonl_files, spawn_watcher_threads};
    16 use anyhow::{Context, Result};
    17 use moraine_clickhouse::ClickHouseClient;
    18 use moraine_config::{AppConfig, IngestSource};
    19 use serde::Deserialize;
    20 use std::collections::{HashMap, HashSet};
    21 use std::sync::atomic::{AtomicU64, Ordering};
    22 use std::sync::{Arc, Mutex};
    23 use tokio::sync::{mpsc, RwLock, Semaphore};
    24 use tracing::{error, info};
    25  
    26 pub(crate) const WATCHER_BACKEND_UNKNOWN: u64 = 0;
    27 pub(crate) const WATCHER_BACKEND_NATIVE: u64 = 1;
    28 pub(crate) const WATCHER_BACKEND_POLL: u64 = 2;
    29 pub(crate) const WATCHER_BACKEND_MIXED: u64 = 3;
    30  
    31 #[derive(Debug, Clone)]
    32 pub(crate) struct WorkItem {
    33     pub(crate) source_name: String,
    34     pub(crate) provider: String,
    35     pub(crate) path: String,
    36 }
    37  
    38 impl WorkItem {
    39     pub(crate) fn key(&self) -> String {
    40         format!("{}\n{}", self.source_name, self.path)
    41     }
    42 }
    43  
    44 #[derive(Default)]
    45 pub(crate) struct DispatchState {
    46     pub(crate) pending: HashSet<String>,
    47     pub(crate) inflight: HashSet<String>,
    48     pub(crate) dirty: HashSet<String>,
    49     pub(crate) item_by_key: HashMap<String, WorkItem>,
    50 }
    51  
    52 #[derive(Default)]
    53 pub(crate) struct Metrics {
    54     pub(crate) raw_rows_written: AtomicU64,
    55     pub(crate) event_rows_written: AtomicU64,
    56     pub(crate) err_rows_written: AtomicU64,
    57     pub(crate) last_flush_ms: AtomicU64,
    58     pub(crate) append_to_visible_p50_ms: AtomicU64,
    59     pub(crate) append_to_visible_p95_ms: AtomicU64,
    60     pub(crate) flush_failures: AtomicU64,
    61     pub(crate) queue_depth: AtomicU64,
    62     pub(crate) watcher_registrations: AtomicU64,
    63     pub(crate) watcher_error_count: AtomicU64,
    64     pub(crate) watcher_reset_count: AtomicU64,
    65     pub(crate) watcher_last_reset_unix_ms: AtomicU64,
    66     pub(crate) watcher_backend_state: AtomicU64,
    67     pub(crate) last_error: Mutex<String>,
    68 }
    69  
    70 #[derive(Debug)]
    71 pub(crate) enum SinkMessage {
    72     Batch(RowBatch),
    73 }
    74  
    75 pub async fn run_ingestor(config: AppConfig) -> Result<()> {
    76     let enabled_sources: Vec<IngestSource> = config
    77         .ingest
    78         .sources
    79         .iter()
    80         .filter(|src| src.enabled)
    81         .cloned()
    82         .collect();
    83  
    84     if enabled_sources.is_empty() {
    85         return Err(anyhow::anyhow!(
    86             "no enabled ingest sources found in config.ingest.sources"
    87         ));
    88     }
    89  
    90     let clickhouse = ClickHouseClient::new(config.clickhouse.clone())?;
    91     clickhouse.ping().await.context("clickhouse ping failed")?;
    92  
    93     let checkpoint_map = load_checkpoints(&clickhouse)
    94         .await
    95         .context("failed to load checkpoints from clickhouse")?;
    96  
    97     info!(
    98         "loaded {} checkpoints across {} sources",
    99         checkpoint_map.len(),
   100         enabled_sources.len()
   101     );
   102  
   103     let checkpoints = Arc::new(RwLock::new(checkpoint_map));
   104     let dispatch = Arc::new(Mutex::new(DispatchState::default()));
   105     let metrics = Arc::new(Metrics::default());
   106  
   107     let process_queue_capacity = config
   108         .ingest
   109         .max_inflight_batches
   110         .saturating_mul(16)
   111         .max(1024);
   112     let (process_tx, mut process_rx) = mpsc::channel::<WorkItem>(process_queue_capacity);
   113     let (sink_tx, sink_rx) =
   114         mpsc::channel::<SinkMessage>(config.ingest.max_inflight_batches.max(16));
   115     let (watch_path_tx, watch_path_rx) = mpsc::unbounded_channel::<WorkItem>();
   116  
   117     let sink_handle = spawn_sink_task(
   118         config.clone(),
   119         clickhouse.clone(),
   120         checkpoints.clone(),
   121         metrics.clone(),
   122         sink_rx,
   123         dispatch.clone(),
   124     );
   125  
   126     let sem = Arc::new(Semaphore::new(config.ingest.max_file_workers.max(1)));
   127     let processor_handle = {
   128         let process_tx_clone = process_tx.clone();
   129         let sink_tx_clone = sink_tx.clone();
   130         let checkpoints_clone = checkpoints.clone();
   131         let dispatch_clone = dispatch.clone();
   132         let sem_clone = sem.clone();
   133         let metrics_clone = metrics.clone();
   134         let cfg_clone = config.clone();
   135  
   136         tokio::spawn(async move {
   137             while let Some(work) = process_rx.recv().await {
   138                 metrics_clone.queue_depth.fetch_sub(1, Ordering::Relaxed);
   139                 let key = work.key();
   140  
   141                 {
   142                     let mut state = dispatch_clone.lock().expect("dispatch mutex poisoned");
   143                     state.pending.remove(&key);
   144                     state.inflight.insert(key.clone());
   145                 }
   146  
   147                 let permit = match sem_clone.clone().acquire_owned().await {
   148                     Ok(permit) => permit,
   149                     Err(_) => break,
   150                 };
   151  
   152                 let sink_tx_worker = sink_tx_clone.clone();
   153                 let process_tx_worker = process_tx_clone.clone();
   154                 let checkpoints_worker = checkpoints_clone.clone();
   155                 let dispatch_worker = dispatch_clone.clone();
   156                 let cfg_worker = cfg_clone.clone();
   157                 let metrics_worker = metrics_clone.clone();
   158  
   159                 tokio::spawn(async move {
   160                     let _permit = permit;
   161                     if let Err(exc) = process_file(
   162                         &cfg_worker,
   163                         &work,
   164                         checkpoints_worker,
   165                         sink_tx_worker,
   166                         &metrics_worker,
   167                     )
   168                     .await
   169                     {
   170                         error!(
   171                             "failed processing {}:{}: {exc}",
   172                             work.source_name, work.path
   173                         );
   174                         *metrics_worker
   175                             .last_error
   176                             .lock()
   177                             .expect("metrics last_error mutex poisoned") = exc.to_string();
   178                     }
   179  
   180                     let reschedule = complete_work(&key, &dispatch_worker);
   181  
   182                     if let Some(item) = reschedule {
   183                         if process_tx_worker.send(item).await.is_ok() {
   184                             metrics_worker.queue_depth.fetch_add(1, Ordering::Relaxed);
   185                         }
   186                     }
   187                 });
   188             }
   189         })
   190     };
   191  
   192     let debounce_handle = spawn_debounce_task(
   193         config.clone(),
   194         watch_path_rx,
   195         process_tx.clone(),
   196         dispatch.clone(),
   197         metrics.clone(),
   198     );
   199  
   200     let reconcile_handle = spawn_reconcile_task(
   201         config.clone(),
   202         enabled_sources.clone(),
   203         process_tx.clone(),
   204         dispatch.clone(),
   205         metrics.clone(),
   206     );
   207  
   208     let watcher_threads =
   209         spawn_watcher_threads(enabled_sources.clone(), watch_path_tx, metrics.clone())?;
   210  
   211     if config.ingest.backfill_on_start {
   212         for source in &enabled_sources {
   213             let files = enumerate_jsonl_files(&source.glob)?;
   214             info!(
   215                 "startup backfill queueing {} files for source={}",
   216                 files.len(),
   217                 source.name
   218             );
   219             for path in files {
   220                 enqueue_work(
   221                     WorkItem {
   222                         source_name: source.name.clone(),
   223                         provider: source.provider.clone(),
   224                         path,
   225                     },
   226                     &process_tx,
   227                     &dispatch,
   228                     &metrics,
   229                 )
   230                 .await;
   231             }
   232         }
   233     }
   234  
   235     info!("rust ingestor running; waiting for shutdown signal");
   236     tokio::signal::ctrl_c()
   237         .await
   238         .context("signal handler failed")?;
   239     info!("shutdown signal received");
   240  
   241     drop(process_tx);
   242     drop(sink_tx);
   243  
   244     debounce_handle.abort();
   245     reconcile_handle.abort();
   246     processor_handle.abort();
   247     sink_handle.abort();
   248  
   249     for handle in watcher_threads {
   250         let _ = handle.thread().id();
   251     }
   252  
   253     Ok(())
   254 }
   255  
   256 #[derive(Deserialize)]
   257 struct CheckpointRow {
   258     source_name: String,
   259     source_file: String,
   260     source_inode: u64,
   261     source_generation: u32,
   262     last_offset: u64,
   263     last_line_no: u64,
   264     status: String,
   265 }
   266  
   267 async fn load_checkpoints(
   268     clickhouse: &ClickHouseClient,
   269 ) -> Result<HashMap<String, model::Checkpoint>> {
   270     let query = format!(
   271         "SELECT \
   272             source_name, \
   273             source_file, \
   274             toUInt64(argMax(source_inode, updated_at)) AS source_inode, \
   275             toUInt32(argMax(source_generation, updated_at)) AS source_generation, \
   276             toUInt64(argMax(last_offset, updated_at)) AS last_offset, \
   277             toUInt64(argMax(last_line_no, updated_at)) AS last_line_no, \
   278             argMax(status, updated_at) AS status \
   279          FROM {}.ingest_checkpoints \
   280          GROUP BY source_name, source_file",
   281         clickhouse.config().database
   282     );
   283  
   284     let rows: Vec<CheckpointRow> = clickhouse.query_rows(&query, None).await?;
   285     let mut map = HashMap::<String, model::Checkpoint>::new();
   286  
   287     for row in rows {
   288         let key = checkpoint_key(&row.source_name, &row.source_file);
   289         map.insert(
   290             key,
   291             model::Checkpoint {
   292                 source_name: row.source_name,
   293                 source_file: row.source_file,
   294                 source_inode: row.source_inode,
   295                 source_generation: row.source_generation.max(1),
   296                 last_offset: row.last_offset,
   297                 last_line_no: row.last_line_no,
   298                 status: row.status,
   299             },
   300         );
   301     }
   302  
   303     Ok(map)
   304 }