crates/moraine-ingest-core/src/lib.rs
1 mod checkpoint;
2 mod dispatch;
3 mod heartbeat;
4 pub mod model;
5 pub mod normalize;
6 mod reconcile;
7 mod sink;
8 mod watch;
9
10 use crate::checkpoint::checkpoint_key;
11 use crate::dispatch::{complete_work, enqueue_work, process_file, spawn_debounce_task};
12 use crate::model::RowBatch;
13 use crate::reconcile::spawn_reconcile_task;
14 use crate::sink::spawn_sink_task;
15 use crate::watch::{enumerate_jsonl_files, spawn_watcher_threads};
16 use anyhow::{Context, Result};
17 use moraine_clickhouse::ClickHouseClient;
18 use moraine_config::{AppConfig, IngestSource};
19 use serde::Deserialize;
20 use std::collections::{HashMap, HashSet};
21 use std::sync::atomic::{AtomicU64, Ordering};
22 use std::sync::{Arc, Mutex};
23 use tokio::sync::{mpsc, RwLock, Semaphore};
24 use tracing::{error, info};
25
26 pub(crate) const WATCHER_BACKEND_UNKNOWN: u64 = 0;
27 pub(crate) const WATCHER_BACKEND_NATIVE: u64 = 1;
28 pub(crate) const WATCHER_BACKEND_POLL: u64 = 2;
29 pub(crate) const WATCHER_BACKEND_MIXED: u64 = 3;
30
31 #[derive(Debug, Clone)]
32 pub(crate) struct WorkItem {
33 pub(crate) source_name: String,
34 pub(crate) provider: String,
35 pub(crate) path: String,
36 }
37
38 impl WorkItem {
39 pub(crate) fn key(&self) -> String {
40 format!("{}\n{}", self.source_name, self.path)
41 }
42 }
43
44 #[derive(Default)]
45 pub(crate) struct DispatchState {
46 pub(crate) pending: HashSet<String>,
47 pub(crate) inflight: HashSet<String>,
48 pub(crate) dirty: HashSet<String>,
49 pub(crate) item_by_key: HashMap<String, WorkItem>,
50 }
51
52 #[derive(Default)]
53 pub(crate) struct Metrics {
54 pub(crate) raw_rows_written: AtomicU64,
55 pub(crate) event_rows_written: AtomicU64,
56 pub(crate) err_rows_written: AtomicU64,
57 pub(crate) last_flush_ms: AtomicU64,
58 pub(crate) append_to_visible_p50_ms: AtomicU64,
59 pub(crate) append_to_visible_p95_ms: AtomicU64,
60 pub(crate) flush_failures: AtomicU64,
61 pub(crate) queue_depth: AtomicU64,
62 pub(crate) watcher_registrations: AtomicU64,
63 pub(crate) watcher_error_count: AtomicU64,
64 pub(crate) watcher_reset_count: AtomicU64,
65 pub(crate) watcher_last_reset_unix_ms: AtomicU64,
66 pub(crate) watcher_backend_state: AtomicU64,
67 pub(crate) last_error: Mutex<String>,
68 }
69
70 #[derive(Debug)]
71 pub(crate) enum SinkMessage {
72 Batch(RowBatch),
73 }
74
75 pub async fn run_ingestor(config: AppConfig) -> Result<()> {
76 let enabled_sources: Vec<IngestSource> = config
77 .ingest
78 .sources
79 .iter()
80 .filter(|src| src.enabled)
81 .cloned()
82 .collect();
83
84 if enabled_sources.is_empty() {
85 return Err(anyhow::anyhow!(
86 "no enabled ingest sources found in config.ingest.sources"
87 ));
88 }
89
90 let clickhouse = ClickHouseClient::new(config.clickhouse.clone())?;
91 clickhouse.ping().await.context("clickhouse ping failed")?;
92
93 let checkpoint_map = load_checkpoints(&clickhouse)
94 .await
95 .context("failed to load checkpoints from clickhouse")?;
96
97 info!(
98 "loaded {} checkpoints across {} sources",
99 checkpoint_map.len(),
100 enabled_sources.len()
101 );
102
103 let checkpoints = Arc::new(RwLock::new(checkpoint_map));
104 let dispatch = Arc::new(Mutex::new(DispatchState::default()));
105 let metrics = Arc::new(Metrics::default());
106
107 let process_queue_capacity = config
108 .ingest
109 .max_inflight_batches
110 .saturating_mul(16)
111 .max(1024);
112 let (process_tx, mut process_rx) = mpsc::channel::<WorkItem>(process_queue_capacity);
113 let (sink_tx, sink_rx) =
114 mpsc::channel::<SinkMessage>(config.ingest.max_inflight_batches.max(16));
115 let (watch_path_tx, watch_path_rx) = mpsc::unbounded_channel::<WorkItem>();
116
117 let sink_handle = spawn_sink_task(
118 config.clone(),
119 clickhouse.clone(),
120 checkpoints.clone(),
121 metrics.clone(),
122 sink_rx,
123 dispatch.clone(),
124 );
125
126 let sem = Arc::new(Semaphore::new(config.ingest.max_file_workers.max(1)));
127 let processor_handle = {
128 let process_tx_clone = process_tx.clone();
129 let sink_tx_clone = sink_tx.clone();
130 let checkpoints_clone = checkpoints.clone();
131 let dispatch_clone = dispatch.clone();
132 let sem_clone = sem.clone();
133 let metrics_clone = metrics.clone();
134 let cfg_clone = config.clone();
135
136 tokio::spawn(async move {
137 while let Some(work) = process_rx.recv().await {
138 metrics_clone.queue_depth.fetch_sub(1, Ordering::Relaxed);
139 let key = work.key();
140
141 {
142 let mut state = dispatch_clone.lock().expect("dispatch mutex poisoned");
143 state.pending.remove(&key);
144 state.inflight.insert(key.clone());
145 }
146
147 let permit = match sem_clone.clone().acquire_owned().await {
148 Ok(permit) => permit,
149 Err(_) => break,
150 };
151
152 let sink_tx_worker = sink_tx_clone.clone();
153 let process_tx_worker = process_tx_clone.clone();
154 let checkpoints_worker = checkpoints_clone.clone();
155 let dispatch_worker = dispatch_clone.clone();
156 let cfg_worker = cfg_clone.clone();
157 let metrics_worker = metrics_clone.clone();
158
159 tokio::spawn(async move {
160 let _permit = permit;
161 if let Err(exc) = process_file(
162 &cfg_worker,
163 &work,
164 checkpoints_worker,
165 sink_tx_worker,
166 &metrics_worker,
167 )
168 .await
169 {
170 error!(
171 "failed processing {}:{}: {exc}",
172 work.source_name, work.path
173 );
174 *metrics_worker
175 .last_error
176 .lock()
177 .expect("metrics last_error mutex poisoned") = exc.to_string();
178 }
179
180 let reschedule = complete_work(&key, &dispatch_worker);
181
182 if let Some(item) = reschedule {
183 if process_tx_worker.send(item).await.is_ok() {
184 metrics_worker.queue_depth.fetch_add(1, Ordering::Relaxed);
185 }
186 }
187 });
188 }
189 })
190 };
191
192 let debounce_handle = spawn_debounce_task(
193 config.clone(),
194 watch_path_rx,
195 process_tx.clone(),
196 dispatch.clone(),
197 metrics.clone(),
198 );
199
200 let reconcile_handle = spawn_reconcile_task(
201 config.clone(),
202 enabled_sources.clone(),
203 process_tx.clone(),
204 dispatch.clone(),
205 metrics.clone(),
206 );
207
208 let watcher_threads =
209 spawn_watcher_threads(enabled_sources.clone(), watch_path_tx, metrics.clone())?;
210
211 if config.ingest.backfill_on_start {
212 for source in &enabled_sources {
213 let files = enumerate_jsonl_files(&source.glob)?;
214 info!(
215 "startup backfill queueing {} files for source={}",
216 files.len(),
217 source.name
218 );
219 for path in files {
220 enqueue_work(
221 WorkItem {
222 source_name: source.name.clone(),
223 provider: source.provider.clone(),
224 path,
225 },
226 &process_tx,
227 &dispatch,
228 &metrics,
229 )
230 .await;
231 }
232 }
233 }
234
235 info!("rust ingestor running; waiting for shutdown signal");
236 tokio::signal::ctrl_c()
237 .await
238 .context("signal handler failed")?;
239 info!("shutdown signal received");
240
241 drop(process_tx);
242 drop(sink_tx);
243
244 debounce_handle.abort();
245 reconcile_handle.abort();
246 processor_handle.abort();
247 sink_handle.abort();
248
249 for handle in watcher_threads {
250 let _ = handle.thread().id();
251 }
252
253 Ok(())
254 }
255
256 #[derive(Deserialize)]
257 struct CheckpointRow {
258 source_name: String,
259 source_file: String,
260 source_inode: u64,
261 source_generation: u32,
262 last_offset: u64,
263 last_line_no: u64,
264 status: String,
265 }
266
267 async fn load_checkpoints(
268 clickhouse: &ClickHouseClient,
269 ) -> Result<HashMap<String, model::Checkpoint>> {
270 let query = format!(
271 "SELECT \
272 source_name, \
273 source_file, \
274 toUInt64(argMax(source_inode, updated_at)) AS source_inode, \
275 toUInt32(argMax(source_generation, updated_at)) AS source_generation, \
276 toUInt64(argMax(last_offset, updated_at)) AS last_offset, \
277 toUInt64(argMax(last_line_no, updated_at)) AS last_line_no, \
278 argMax(status, updated_at) AS status \
279 FROM {}.ingest_checkpoints \
280 GROUP BY source_name, source_file",
281 clickhouse.config().database
282 );
283
284 let rows: Vec<CheckpointRow> = clickhouse.query_rows(&query, None).await?;
285 let mut map = HashMap::<String, model::Checkpoint>::new();
286
287 for row in rows {
288 let key = checkpoint_key(&row.source_name, &row.source_file);
289 map.insert(
290 key,
291 model::Checkpoint {
292 source_name: row.source_name,
293 source_file: row.source_file,
294 source_inode: row.source_inode,
295 source_generation: row.source_generation.max(1),
296 last_offset: row.last_offset,
297 last_line_no: row.last_line_no,
298 status: row.status,
299 },
300 );
301 }
302
303 Ok(map)
304 }