Skip to content

rust/ingestor/src/clickhouse.rs


     1 use crate::config::ClickHouseConfig;
     2 use crate::model::Checkpoint;
     3 use anyhow::Result;
     4 use moraine_clickhouse::ClickHouseClient as SharedClickHouseClient;
     5 use serde_json::Value;
     6 use std::collections::HashMap;
     7  
     8 #[derive(Clone)]
     9 pub struct ClickHouseClient {
    10     inner: SharedClickHouseClient,
    11 }
    12  
    13 impl ClickHouseClient {
    14     pub fn new(cfg: ClickHouseConfig) -> Result<Self> {
    15         Ok(Self {
    16             inner: SharedClickHouseClient::new(cfg)?,
    17         })
    18     }
    19  
    20     pub async fn ping(&self) -> Result<()> {
    21         self.inner.ping().await
    22     }
    23  
    24     pub async fn insert_json_rows(&self, table: &str, rows: &[Value]) -> Result<()> {
    25         self.inner.insert_json_rows(table, rows).await
    26     }
    27  
    28     pub async fn load_checkpoints(&self) -> Result<HashMap<String, Checkpoint>> {
    29         let query = format!(
    30             "SELECT source_name, source_file, argMax(source_inode, updated_at), argMax(source_generation, updated_at), argMax(last_offset, updated_at), argMax(last_line_no, updated_at), argMax(status, updated_at) FROM {}.ingest_checkpoints GROUP BY source_name, source_file FORMAT TabSeparated",
    31             self.inner.config().database
    32         );
    33  
    34         let raw = self
    35             .inner
    36             .request_text(&query, None, None, false, None)
    37             .await?;
    38  
    39         let mut map = HashMap::<String, Checkpoint>::new();
    40  
    41         for line in raw.lines() {
    42             if line.trim().is_empty() {
    43                 continue;
    44             }
    45             let fields: Vec<&str> = line.split('\t').collect();
    46             if fields.len() < 7 {
    47                 continue;
    48             }
    49  
    50             let source_name = fields[0].to_string();
    51             let source_file = fields[1].to_string();
    52             let source_inode = fields[2].parse::<u64>().unwrap_or(0);
    53             let source_generation = fields[3].parse::<u32>().unwrap_or(1);
    54             let last_offset = fields[4].parse::<u64>().unwrap_or(0);
    55             let last_line_no = fields[5].parse::<u64>().unwrap_or(0);
    56             let status = fields[6].to_string();
    57  
    58             map.insert(
    59                 format!("{}\n{}", source_name, source_file),
    60                 Checkpoint {
    61                     source_name,
    62                     source_file,
    63                     source_inode,
    64                     source_generation,
    65                     last_offset,
    66                     last_line_no,
    67                     status,
    68                 },
    69             );
    70         }
    71  
    72         Ok(map)
    73     }
    74 }