rust/ingestor/src/clickhouse.rs
1 use crate::config::ClickHouseConfig;
2 use crate::model::Checkpoint;
3 use anyhow::Result;
4 use moraine_clickhouse::ClickHouseClient as SharedClickHouseClient;
5 use serde_json::Value;
6 use std::collections::HashMap;
7
8 #[derive(Clone)]
9 pub struct ClickHouseClient {
10 inner: SharedClickHouseClient,
11 }
12
13 impl ClickHouseClient {
14 pub fn new(cfg: ClickHouseConfig) -> Result<Self> {
15 Ok(Self {
16 inner: SharedClickHouseClient::new(cfg)?,
17 })
18 }
19
20 pub async fn ping(&self) -> Result<()> {
21 self.inner.ping().await
22 }
23
24 pub async fn insert_json_rows(&self, table: &str, rows: &[Value]) -> Result<()> {
25 self.inner.insert_json_rows(table, rows).await
26 }
27
28 pub async fn load_checkpoints(&self) -> Result<HashMap<String, Checkpoint>> {
29 let query = format!(
30 "SELECT source_name, source_file, argMax(source_inode, updated_at), argMax(source_generation, updated_at), argMax(last_offset, updated_at), argMax(last_line_no, updated_at), argMax(status, updated_at) FROM {}.ingest_checkpoints GROUP BY source_name, source_file FORMAT TabSeparated",
31 self.inner.config().database
32 );
33
34 let raw = self
35 .inner
36 .request_text(&query, None, None, false, None)
37 .await?;
38
39 let mut map = HashMap::<String, Checkpoint>::new();
40
41 for line in raw.lines() {
42 if line.trim().is_empty() {
43 continue;
44 }
45 let fields: Vec<&str> = line.split('\t').collect();
46 if fields.len() < 7 {
47 continue;
48 }
49
50 let source_name = fields[0].to_string();
51 let source_file = fields[1].to_string();
52 let source_inode = fields[2].parse::<u64>().unwrap_or(0);
53 let source_generation = fields[3].parse::<u32>().unwrap_or(1);
54 let last_offset = fields[4].parse::<u64>().unwrap_or(0);
55 let last_line_no = fields[5].parse::<u64>().unwrap_or(0);
56 let status = fields[6].to_string();
57
58 map.insert(
59 format!("{}\n{}", source_name, source_file),
60 Checkpoint {
61 source_name,
62 source_file,
63 source_inode,
64 source_generation,
65 last_offset,
66 last_line_no,
67 status,
68 },
69 );
70 }
71
72 Ok(map)
73 }
74 }