rust/ingestor/src/ingestor.rs
1 use crate::clickhouse::ClickHouseClient;
2 use crate::config::{AppConfig, IngestSource};
3 use crate::model::{Checkpoint, RowBatch};
4 use crate::normalize::normalize_record;
5 use anyhow::{Context, Result};
6 use glob::glob;
7 use notify::{Event, RecursiveMode, Watcher};
8 use serde_json::{json, Value};
9 use std::collections::{HashMap, HashSet};
10 use std::io::{BufRead, BufReader, Seek, SeekFrom};
11 use std::sync::atomic::{AtomicU64, Ordering};
12 use std::sync::{Arc, Mutex};
13 use std::time::{Duration, Instant};
14 use tokio::sync::{mpsc, RwLock, Semaphore};
15 use tokio::task::JoinHandle;
16 use tracing::{debug, error, info, warn};
17
18 #[cfg(unix)]
19 use std::os::unix::fs::MetadataExt;
20
21 #[derive(Debug, Clone)]
22 struct WorkItem {
23 source_name: String,
24 provider: String,
25 path: String,
26 }
27
28 impl WorkItem {
29 fn key(&self) -> String {
30 format!("{}\n{}", self.source_name, self.path)
31 }
32 }
33
34 #[derive(Default)]
35 struct DispatchState {
36 pending: HashSet<String>,
37 inflight: HashSet<String>,
38 dirty: HashSet<String>,
39 item_by_key: HashMap<String, WorkItem>,
40 }
41
42 #[derive(Default)]
43 struct Metrics {
44 raw_rows_written: AtomicU64,
45 event_rows_written: AtomicU64,
46 err_rows_written: AtomicU64,
47 last_flush_ms: AtomicU64,
48 flush_failures: AtomicU64,
49 queue_depth: AtomicU64,
50 last_error: Mutex<String>,
51 }
52
53 #[derive(Debug)]
54 enum SinkMessage {
55 Batch(RowBatch),
56 }
57
58 pub async fn run_ingestor(config: AppConfig) -> Result<()> {
59 let enabled_sources: Vec<IngestSource> = config
60 .ingest
61 .sources
62 .iter()
63 .filter(|src| src.enabled)
64 .cloned()
65 .collect();
66
67 if enabled_sources.is_empty() {
68 return Err(anyhow::anyhow!(
69 "no enabled ingest sources found in config.ingest.sources"
70 ));
71 }
72
73 let clickhouse = ClickHouseClient::new(config.clickhouse.clone())?;
74 clickhouse.ping().await.context("clickhouse ping failed")?;
75
76 let checkpoint_map = clickhouse
77 .load_checkpoints()
78 .await
79 .context("failed to load checkpoints from clickhouse")?;
80
81 info!(
82 "loaded {} checkpoints across {} sources",
83 checkpoint_map.len(),
84 enabled_sources.len()
85 );
86
87 let checkpoints = Arc::new(RwLock::new(checkpoint_map));
88 let dispatch = Arc::new(Mutex::new(DispatchState::default()));
89 let metrics = Arc::new(Metrics::default());
90
91 let process_queue_capacity = config
92 .ingest
93 .max_inflight_batches
94 .saturating_mul(16)
95 .max(1024);
96 let (process_tx, mut process_rx) = mpsc::channel::<WorkItem>(process_queue_capacity);
97 let (sink_tx, sink_rx) =
98 mpsc::channel::<SinkMessage>(config.ingest.max_inflight_batches.max(16));
99 let (watch_path_tx, watch_path_rx) = mpsc::unbounded_channel::<WorkItem>();
100
101 let sink_handle = spawn_sink_task(
102 config.clone(),
103 clickhouse.clone(),
104 checkpoints.clone(),
105 metrics.clone(),
106 sink_rx,
107 dispatch.clone(),
108 );
109
110 let sem = Arc::new(Semaphore::new(config.ingest.max_file_workers.max(1)));
111 let processor_handle = {
112 let process_tx_clone = process_tx.clone();
113 let sink_tx_clone = sink_tx.clone();
114 let checkpoints_clone = checkpoints.clone();
115 let dispatch_clone = dispatch.clone();
116 let sem_clone = sem.clone();
117 let metrics_clone = metrics.clone();
118 let cfg_clone = config.clone();
119
120 tokio::spawn(async move {
121 while let Some(work) = process_rx.recv().await {
122 metrics_clone.queue_depth.fetch_sub(1, Ordering::Relaxed);
123 let key = work.key();
124
125 {
126 let mut state = dispatch_clone.lock().expect("dispatch mutex poisoned");
127 state.pending.remove(&key);
128 state.inflight.insert(key.clone());
129 }
130
131 let permit = match sem_clone.clone().acquire_owned().await {
132 Ok(permit) => permit,
133 Err(_) => break,
134 };
135
136 let sink_tx_worker = sink_tx_clone.clone();
137 let process_tx_worker = process_tx_clone.clone();
138 let checkpoints_worker = checkpoints_clone.clone();
139 let dispatch_worker = dispatch_clone.clone();
140 let cfg_worker = cfg_clone.clone();
141 let metrics_worker = metrics_clone.clone();
142
143 tokio::spawn(async move {
144 let _permit = permit;
145 if let Err(exc) = process_file(
146 &cfg_worker,
147 &work,
148 checkpoints_worker,
149 sink_tx_worker,
150 &metrics_worker,
151 )
152 .await
153 {
154 error!(
155 "failed processing {}:{}: {exc}",
156 work.source_name, work.path
157 );
158 *metrics_worker
159 .last_error
160 .lock()
161 .expect("metrics last_error mutex poisoned") = exc.to_string();
162 }
163
164 let mut reschedule: Option<WorkItem> = None;
165 {
166 let mut state = dispatch_worker.lock().expect("dispatch mutex poisoned");
167 state.inflight.remove(&key);
168 if state.dirty.remove(&key) {
169 if state.pending.insert(key.clone()) {
170 reschedule = state.item_by_key.get(&key).cloned();
171 }
172 }
173 }
174
175 if let Some(item) = reschedule {
176 if process_tx_worker.send(item).await.is_ok() {
177 metrics_worker.queue_depth.fetch_add(1, Ordering::Relaxed);
178 }
179 }
180 });
181 }
182 })
183 };
184
185 let debounce_handle = spawn_debounce_task(
186 config.clone(),
187 watch_path_rx,
188 process_tx.clone(),
189 dispatch.clone(),
190 metrics.clone(),
191 );
192
193 let reconcile_handle = spawn_reconcile_task(
194 config.clone(),
195 enabled_sources.clone(),
196 process_tx.clone(),
197 dispatch.clone(),
198 metrics.clone(),
199 );
200
201 let watcher_threads = spawn_watcher_threads(enabled_sources.clone(), watch_path_tx)?;
202
203 if config.ingest.backfill_on_start {
204 for source in &enabled_sources {
205 let files = enumerate_jsonl_files(&source.glob)?;
206 info!(
207 "startup backfill queueing {} files for source={}",
208 files.len(),
209 source.name
210 );
211 for path in files {
212 enqueue_work(
213 WorkItem {
214 source_name: source.name.clone(),
215 provider: source.provider.clone(),
216 path,
217 },
218 &process_tx,
219 &dispatch,
220 &metrics,
221 )
222 .await;
223 }
224 }
225 }
226
227 info!("rust ingestor running; waiting for shutdown signal");
228 tokio::signal::ctrl_c()
229 .await
230 .context("signal handler failed")?;
231 info!("shutdown signal received");
232
233 drop(process_tx);
234 drop(sink_tx);
235
236 debounce_handle.abort();
237 reconcile_handle.abort();
238 processor_handle.abort();
239 sink_handle.abort();
240
241 for handle in watcher_threads {
242 let _ = handle.thread().id();
243 }
244
245 Ok(())
246 }
247
248 fn spawn_watcher_threads(
249 sources: Vec<IngestSource>,
250 tx: mpsc::UnboundedSender<WorkItem>,
251 ) -> Result<Vec<std::thread::JoinHandle<()>>> {
252 let mut handles = Vec::<std::thread::JoinHandle<()>>::new();
253
254 for source in sources {
255 let source_name = source.name.clone();
256 let provider = source.provider.clone();
257 let watch_root = std::path::PathBuf::from(source.watch_root.clone());
258 let tx_clone = tx.clone();
259
260 info!(
261 "starting watcher on {} (source={}, provider={})",
262 watch_root.display(),
263 source_name,
264 provider
265 );
266
267 let handle = std::thread::spawn(move || {
268 let (event_tx, event_rx) = std::sync::mpsc::channel::<notify::Result<Event>>();
269
270 let mut watcher = match notify::recommended_watcher(move |res| {
271 let _ = event_tx.send(res);
272 }) {
273 Ok(watcher) => watcher,
274 Err(exc) => {
275 eprintln!(
276 "[moraine-rust] failed to create watcher for {}: {exc}",
277 source_name
278 );
279 return;
280 }
281 };
282
283 if let Err(exc) = watcher.watch(watch_root.as_path(), RecursiveMode::Recursive) {
284 eprintln!(
285 "[moraine-rust] failed to watch {} ({}): {exc}",
286 watch_root.display(),
287 source_name
288 );
289 return;
290 }
291
292 loop {
293 match event_rx.recv() {
294 Ok(Ok(event)) => {
295 for path in event.paths {
296 let _ = tx_clone.send(WorkItem {
297 source_name: source_name.clone(),
298 provider: provider.clone(),
299 path: path.to_string_lossy().to_string(),
300 });
301 }
302 }
303 Ok(Err(exc)) => {
304 eprintln!("[moraine-rust] watcher event error ({source_name}): {exc}");
305 }
306 Err(_) => break,
307 }
308 }
309 });
310
311 handles.push(handle);
312 }
313
314 Ok(handles)
315 }
316
317 fn spawn_debounce_task(
318 config: AppConfig,
319 mut rx: mpsc::UnboundedReceiver<WorkItem>,
320 process_tx: mpsc::Sender<WorkItem>,
321 dispatch: Arc<Mutex<DispatchState>>,
322 metrics: Arc<Metrics>,
323 ) -> JoinHandle<()> {
324 tokio::spawn(async move {
325 let debounce = Duration::from_millis(config.ingest.debounce_ms.max(5));
326 let mut pending = HashMap::<String, (WorkItem, Instant)>::new();
327 let mut tick = tokio::time::interval(Duration::from_millis(
328 (config.ingest.debounce_ms / 2).max(10),
329 ));
330
331 loop {
332 tokio::select! {
333 maybe_work = rx.recv() => {
334 match maybe_work {
335 Some(work) => {
336 pending.insert(work.key(), (work, Instant::now()));
337 }
338 None => break,
339 }
340 }
341 _ = tick.tick() => {
342 if pending.is_empty() {
343 continue;
344 }
345
346 let now = Instant::now();
347 let ready: Vec<String> = pending
348 .iter()
349 .filter_map(|(key, (_, seen_at))| {
350 if now.duration_since(*seen_at) >= debounce {
351 Some(key.clone())
352 } else {
353 None
354 }
355 })
356 .collect();
357
358 for key in ready {
359 if let Some((work, _)) = pending.remove(&key) {
360 if !work.path.ends_with(".jsonl") {
361 continue;
362 }
363
364 enqueue_work(work, &process_tx, &dispatch, &metrics).await;
365 }
366 }
367 }
368 }
369 }
370 })
371 }
372
373 fn spawn_reconcile_task(
374 config: AppConfig,
375 sources: Vec<IngestSource>,
376 process_tx: mpsc::Sender<WorkItem>,
377 dispatch: Arc<Mutex<DispatchState>>,
378 metrics: Arc<Metrics>,
379 ) -> JoinHandle<()> {
380 tokio::spawn(async move {
381 let interval = Duration::from_secs_f64(config.ingest.reconcile_interval_seconds.max(5.0));
382 let mut ticker = tokio::time::interval(interval);
383
384 loop {
385 ticker.tick().await;
386 for source in &sources {
387 match enumerate_jsonl_files(&source.glob) {
388 Ok(paths) => {
389 debug!(
390 "reconcile scanning {} files for source={}",
391 paths.len(),
392 source.name
393 );
394 for path in paths {
395 enqueue_work(
396 WorkItem {
397 source_name: source.name.clone(),
398 provider: source.provider.clone(),
399 path,
400 },
401 &process_tx,
402 &dispatch,
403 &metrics,
404 )
405 .await;
406 }
407 }
408 Err(exc) => {
409 warn!("reconcile scan failed for source={}: {exc}", source.name);
410 }
411 }
412 }
413 }
414 })
415 }
416
417 fn spawn_sink_task(
418 config: AppConfig,
419 clickhouse: ClickHouseClient,
420 checkpoints: Arc<RwLock<HashMap<String, Checkpoint>>>,
421 metrics: Arc<Metrics>,
422 mut rx: mpsc::Receiver<SinkMessage>,
423 dispatch: Arc<Mutex<DispatchState>>,
424 ) -> JoinHandle<()> {
425 tokio::spawn(async move {
426 let mut raw_rows = Vec::<Value>::new();
427 let mut event_rows = Vec::<Value>::new();
428 let mut link_rows = Vec::<Value>::new();
429 let mut tool_rows = Vec::<Value>::new();
430 let mut error_rows = Vec::<Value>::new();
431 let mut checkpoint_updates = HashMap::<String, Checkpoint>::new();
432
433 let flush_interval =
434 Duration::from_secs_f64(config.ingest.flush_interval_seconds.max(0.05));
435 let heartbeat_interval =
436 Duration::from_secs_f64(config.ingest.heartbeat_interval_seconds.max(1.0));
437
438 let mut flush_tick = tokio::time::interval(flush_interval);
439 let mut heartbeat_tick = tokio::time::interval(heartbeat_interval);
440
441 loop {
442 tokio::select! {
443 maybe_msg = rx.recv() => {
444 match maybe_msg {
445 Some(SinkMessage::Batch(batch)) => {
446 raw_rows.extend(batch.raw_rows);
447 event_rows.extend(batch.event_rows);
448 link_rows.extend(batch.link_rows);
449 tool_rows.extend(batch.tool_rows);
450 error_rows.extend(batch.error_rows);
451 if let Some(cp) = batch.checkpoint {
452 merge_checkpoint(&mut checkpoint_updates, cp);
453 }
454
455 let total_rows = raw_rows.len() + event_rows.len() + link_rows.len() + tool_rows.len() + error_rows.len();
456 if total_rows >= config.ingest.batch_size {
457 flush_pending(
458 &clickhouse,
459 &checkpoints,
460 &metrics,
461 &mut raw_rows,
462 &mut event_rows,
463 &mut link_rows,
464 &mut tool_rows,
465 &mut error_rows,
466 &mut checkpoint_updates,
467 ).await;
468 }
469 }
470 None => break,
471 }
472 }
473 _ = flush_tick.tick() => {
474 if !(raw_rows.is_empty() && event_rows.is_empty() && link_rows.is_empty() && tool_rows.is_empty() && error_rows.is_empty() && checkpoint_updates.is_empty()) {
475 flush_pending(
476 &clickhouse,
477 &checkpoints,
478 &metrics,
479 &mut raw_rows,
480 &mut event_rows,
481 &mut link_rows,
482 &mut tool_rows,
483 &mut error_rows,
484 &mut checkpoint_updates,
485 ).await;
486 }
487 }
488 _ = heartbeat_tick.tick() => {
489 let files_active = {
490 let state = dispatch.lock().expect("dispatch mutex poisoned");
491 state.inflight.len() as u32
492 };
493 let files_watched = checkpoints.read().await.len() as u32;
494 let last_error = {
495 metrics
496 .last_error
497 .lock()
498 .expect("metrics last_error mutex poisoned")
499 .clone()
500 };
501
502 let heartbeat = json!({
503 "host": host_name(),
504 "service_version": env!("CARGO_PKG_VERSION"),
505 "queue_depth": metrics.queue_depth.load(Ordering::Relaxed),
506 "files_active": files_active,
507 "files_watched": files_watched,
508 "rows_raw_written": metrics.raw_rows_written.load(Ordering::Relaxed),
509 "rows_events_written": metrics.event_rows_written.load(Ordering::Relaxed),
510 "rows_errors_written": metrics.err_rows_written.load(Ordering::Relaxed),
511 "flush_latency_ms": metrics.last_flush_ms.load(Ordering::Relaxed) as u32,
512 "append_to_visible_p50_ms": 0u32,
513 "append_to_visible_p95_ms": 0u32,
514 "last_error": last_error,
515 });
516
517 if let Err(exc) = clickhouse.insert_json_rows("ingest_heartbeats", &[heartbeat]).await {
518 warn!("heartbeat insert failed: {exc}");
519 }
520 }
521 }
522 }
523
524 if !(raw_rows.is_empty()
525 && event_rows.is_empty()
526 && link_rows.is_empty()
527 && tool_rows.is_empty()
528 && error_rows.is_empty()
529 && checkpoint_updates.is_empty())
530 {
531 flush_pending(
532 &clickhouse,
533 &checkpoints,
534 &metrics,
535 &mut raw_rows,
536 &mut event_rows,
537 &mut link_rows,
538 &mut tool_rows,
539 &mut error_rows,
540 &mut checkpoint_updates,
541 )
542 .await;
543 }
544 })
545 }
546
547 fn checkpoint_key(source_name: &str, source_file: &str) -> String {
548 format!("{}\n{}", source_name, source_file)
549 }
550
551 fn merge_checkpoint(pending: &mut HashMap<String, Checkpoint>, checkpoint: Checkpoint) {
552 let key = checkpoint_key(&checkpoint.source_name, &checkpoint.source_file);
553 match pending.get(&key) {
554 None => {
555 pending.insert(key, checkpoint);
556 }
557 Some(existing) => {
558 let replace = checkpoint.source_generation > existing.source_generation
559 || (checkpoint.source_generation == existing.source_generation
560 && checkpoint.last_offset >= existing.last_offset);
561 if replace {
562 pending.insert(key, checkpoint);
563 }
564 }
565 }
566 }
567
568 async fn flush_pending(
569 clickhouse: &ClickHouseClient,
570 checkpoints: &Arc<RwLock<HashMap<String, Checkpoint>>>,
571 metrics: &Arc<Metrics>,
572 raw_rows: &mut Vec<Value>,
573 event_rows: &mut Vec<Value>,
574 link_rows: &mut Vec<Value>,
575 tool_rows: &mut Vec<Value>,
576 error_rows: &mut Vec<Value>,
577 checkpoint_updates: &mut HashMap<String, Checkpoint>,
578 ) {
579 let started = Instant::now();
580
581 let checkpoint_rows: Vec<Value> = checkpoint_updates
582 .values()
583 .map(|cp| {
584 json!({
585 "source_name": cp.source_name,
586 "source_file": cp.source_file,
587 "source_inode": cp.source_inode,
588 "source_generation": cp.source_generation,
589 "last_offset": cp.last_offset,
590 "last_line_no": cp.last_line_no,
591 "status": cp.status,
592 })
593 })
594 .collect();
595
596 let flush_result = async {
597 clickhouse.insert_json_rows("raw_events", raw_rows).await?;
598 clickhouse.insert_json_rows("events", event_rows).await?;
599 clickhouse.insert_json_rows("event_links", link_rows).await?;
600 clickhouse.insert_json_rows("tool_io", tool_rows).await?;
601 clickhouse.insert_json_rows("ingest_errors", error_rows).await?;
602 clickhouse
603 .insert_json_rows("ingest_checkpoints", &checkpoint_rows)
604 .await?;
605 Result::<()>::Ok(())
606 }
607 .await;
608
609 match flush_result {
610 Ok(()) => {
611 metrics
612 .raw_rows_written
613 .fetch_add(raw_rows.len() as u64, Ordering::Relaxed);
614 metrics
615 .event_rows_written
616 .fetch_add(event_rows.len() as u64, Ordering::Relaxed);
617 metrics
618 .err_rows_written
619 .fetch_add(error_rows.len() as u64, Ordering::Relaxed);
620 metrics
621 .last_flush_ms
622 .store(started.elapsed().as_millis() as u64, Ordering::Relaxed);
623
624 {
625 let mut state = checkpoints.write().await;
626 for cp in checkpoint_updates.values() {
627 let key = checkpoint_key(&cp.source_name, &cp.source_file);
628 state.insert(key, cp.clone());
629 }
630 }
631
632 raw_rows.clear();
633 event_rows.clear();
634 link_rows.clear();
635 tool_rows.clear();
636 error_rows.clear();
637 checkpoint_updates.clear();
638 }
639 Err(exc) => {
640 metrics.flush_failures.fetch_add(1, Ordering::Relaxed);
641 *metrics
642 .last_error
643 .lock()
644 .expect("metrics last_error mutex poisoned") = exc.to_string();
645 warn!("flush failed: {exc}");
646 }
647 }
648 }
649
650 async fn enqueue_work(
651 work: WorkItem,
652 process_tx: &mpsc::Sender<WorkItem>,
653 dispatch: &Arc<Mutex<DispatchState>>,
654 metrics: &Arc<Metrics>,
655 ) {
656 if !work.path.ends_with(".jsonl") {
657 return;
658 }
659
660 let key = work.key();
661 let mut should_send = false;
662 {
663 let mut state = dispatch.lock().expect("dispatch mutex poisoned");
664 state.item_by_key.insert(key.clone(), work.clone());
665 if state.inflight.contains(&key) {
666 state.dirty.insert(key.clone());
667 } else if state.pending.insert(key.clone()) {
668 should_send = true;
669 }
670 }
671
672 if should_send {
673 if process_tx.send(work).await.is_ok() {
674 metrics.queue_depth.fetch_add(1, Ordering::Relaxed);
675 }
676 }
677 }
678
679 async fn process_file(
680 config: &AppConfig,
681 work: &WorkItem,
682 checkpoints: Arc<RwLock<HashMap<String, Checkpoint>>>,
683 sink_tx: mpsc::Sender<SinkMessage>,
684 metrics: &Arc<Metrics>,
685 ) -> Result<()> {
686 let source_file = &work.path;
687
688 let meta = match std::fs::metadata(source_file) {
689 Ok(meta) => meta,
690 Err(exc) => {
691 debug!("metadata missing for {}: {}", source_file, exc);
692 return Ok(());
693 }
694 };
695
696 #[cfg(unix)]
697 let inode = meta.ino();
698 #[cfg(not(unix))]
699 let inode = 0u64;
700
701 let file_size = meta.len();
702 let cp_key = checkpoint_key(&work.source_name, source_file);
703 let committed = { checkpoints.read().await.get(&cp_key).cloned() };
704
705 let mut checkpoint = committed.unwrap_or(Checkpoint {
706 source_name: work.source_name.clone(),
707 source_file: source_file.to_string(),
708 source_inode: inode,
709 source_generation: 1,
710 last_offset: 0,
711 last_line_no: 0,
712 status: "active".to_string(),
713 });
714
715 let mut generation_changed = false;
716 if checkpoint.source_inode != inode || file_size < checkpoint.last_offset {
717 checkpoint.source_inode = inode;
718 checkpoint.source_generation = checkpoint.source_generation.saturating_add(1).max(1);
719 checkpoint.last_offset = 0;
720 checkpoint.last_line_no = 0;
721 checkpoint.status = "active".to_string();
722 generation_changed = true;
723 }
724
725 if file_size == checkpoint.last_offset && !generation_changed {
726 return Ok(());
727 }
728
729 let mut file = std::fs::File::open(source_file)
730 .with_context(|| format!("failed to open {}", source_file))?;
731 file.seek(SeekFrom::Start(checkpoint.last_offset))
732 .with_context(|| format!("failed to seek {}", source_file))?;
733
734 let mut reader = BufReader::new(file);
735 let mut offset = checkpoint.last_offset;
736 let mut line_no = checkpoint.last_line_no;
737 let mut session_hint = String::new();
738 let mut model_hint = String::new();
739
740 let mut batch = RowBatch::default();
741
742 loop {
743 let start_offset = offset;
744 let mut buf = Vec::<u8>::new();
745 let bytes_read = reader
746 .read_until(b'\n', &mut buf)
747 .with_context(|| format!("failed reading {}", source_file))?;
748
749 if bytes_read == 0 {
750 break;
751 }
752
753 offset = offset.saturating_add(bytes_read as u64);
754 line_no = line_no.saturating_add(1);
755
756 let mut text = String::from_utf8_lossy(&buf).to_string();
757 if text.ends_with('\n') {
758 text.pop();
759 }
760
761 if text.trim().is_empty() {
762 continue;
763 }
764
765 let parsed: Value = match serde_json::from_str::<Value>(&text) {
766 Ok(value) if value.is_object() => value,
767 Ok(_) => {
768 batch.error_rows.push(json!({
769 "source_name": work.source_name,
770 "provider": work.provider,
771 "source_file": source_file,
772 "source_inode": inode,
773 "source_generation": checkpoint.source_generation,
774 "source_line_no": line_no,
775 "source_offset": start_offset,
776 "error_kind": "json_parse_error",
777 "error_text": "Expected JSON object",
778 "raw_fragment": truncate(&text, 20_000),
779 }));
780 continue;
781 }
782 Err(exc) => {
783 batch.error_rows.push(json!({
784 "source_name": work.source_name,
785 "provider": work.provider,
786 "source_file": source_file,
787 "source_inode": inode,
788 "source_generation": checkpoint.source_generation,
789 "source_line_no": line_no,
790 "source_offset": start_offset,
791 "error_kind": "json_parse_error",
792 "error_text": exc.to_string(),
793 "raw_fragment": truncate(&text, 20_000),
794 }));
795 continue;
796 }
797 };
798
799 let normalized = normalize_record(
800 &parsed,
801 &work.source_name,
802 &work.provider,
803 source_file,
804 inode,
805 checkpoint.source_generation,
806 line_no,
807 start_offset,
808 &session_hint,
809 &model_hint,
810 );
811
812 session_hint = normalized.session_hint;
813 model_hint = normalized.model_hint;
814 batch.raw_rows.push(normalized.raw_row);
815 batch.event_rows.extend(normalized.event_rows);
816 batch.link_rows.extend(normalized.link_rows);
817 batch.tool_rows.extend(normalized.tool_rows);
818 batch.lines_processed = batch.lines_processed.saturating_add(1);
819
820 if batch.row_count() >= config.ingest.batch_size {
821 let mut chunk = RowBatch::default();
822 chunk.raw_rows = std::mem::take(&mut batch.raw_rows);
823 chunk.event_rows = std::mem::take(&mut batch.event_rows);
824 chunk.link_rows = std::mem::take(&mut batch.link_rows);
825 chunk.tool_rows = std::mem::take(&mut batch.tool_rows);
826 chunk.error_rows = std::mem::take(&mut batch.error_rows);
827 chunk.lines_processed = batch.lines_processed;
828 batch.lines_processed = 0;
829 chunk.checkpoint = Some(Checkpoint {
830 source_name: work.source_name.clone(),
831 source_file: source_file.to_string(),
832 source_inode: inode,
833 source_generation: checkpoint.source_generation,
834 last_offset: offset,
835 last_line_no: line_no,
836 status: "active".to_string(),
837 });
838
839 sink_tx
840 .send(SinkMessage::Batch(chunk))
841 .await
842 .context("sink channel closed while sending chunk")?;
843 }
844 }
845
846 let final_checkpoint = Checkpoint {
847 source_name: work.source_name.clone(),
848 source_file: source_file.to_string(),
849 source_inode: inode,
850 source_generation: checkpoint.source_generation,
851 last_offset: offset,
852 last_line_no: line_no,
853 status: "active".to_string(),
854 };
855
856 if batch.row_count() > 0 || generation_changed || offset != checkpoint.last_offset {
857 batch.checkpoint = Some(final_checkpoint);
858 sink_tx
859 .send(SinkMessage::Batch(batch))
860 .await
861 .context("sink channel closed while sending final batch")?;
862 }
863
864 if metrics.queue_depth.load(Ordering::Relaxed) == 0 {
865 debug!(
866 "{}:{} caught up at offset {}",
867 work.source_name, source_file, offset
868 );
869 }
870
871 Ok(())
872 }
873
874 fn truncate(input: &str, max_chars: usize) -> String {
875 if input.chars().count() <= max_chars {
876 return input.to_string();
877 }
878 input.chars().take(max_chars).collect()
879 }
880
881 fn enumerate_jsonl_files(glob_pattern: &str) -> Result<Vec<String>> {
882 let mut files = Vec::<String>::new();
883 for entry in glob(glob_pattern).with_context(|| format!("invalid glob: {}", glob_pattern))? {
884 let path = match entry {
885 Ok(path) => path,
886 Err(exc) => {
887 warn!("glob iteration error: {exc}");
888 continue;
889 }
890 };
891
892 if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
893 files.push(path.to_string_lossy().to_string());
894 }
895 }
896 files.sort();
897 Ok(files)
898 }
899
900 fn host_name() -> String {
901 std::env::var("HOSTNAME")
902 .ok()
903 .filter(|s| !s.trim().is_empty())
904 .or_else(|| std::env::var("USER").ok())
905 .unwrap_or_else(|| "localhost".to_string())
906 }