Skip to content

crates/moraine-clickhouse/src/lib.rs


     1 use anyhow::{anyhow, bail, Context, Result};
     2 use moraine_config::ClickHouseConfig;
     3 use reqwest::{
     4     header::{CONTENT_LENGTH, CONTENT_TYPE},
     5     Client, Url,
     6 };
     7 use serde::de::DeserializeOwned;
     8 use serde::Deserialize;
     9 use serde_json::Value;
    10 use std::collections::HashSet;
    11 use std::time::Duration;
    12  
    13 #[derive(Clone)]
    14 pub struct ClickHouseClient {
    15     cfg: ClickHouseConfig,
    16     http: Client,
    17 }
    18  
    19 #[derive(Deserialize)]
    20 struct ClickHouseEnvelope<T> {
    21     data: Vec<T>,
    22 }
    23  
    24 #[derive(Debug, Clone)]
    25 pub struct Migration {
    26     pub version: &'static str,
    27     pub name: &'static str,
    28     pub sql: &'static str,
    29 }
    30  
    31 #[derive(Debug, Clone, serde::Serialize)]
    32 pub struct DoctorReport {
    33     pub clickhouse_healthy: bool,
    34     pub clickhouse_version: Option<String>,
    35     pub database: String,
    36     pub database_exists: bool,
    37     pub applied_migrations: Vec<String>,
    38     pub pending_migrations: Vec<String>,
    39     pub missing_tables: Vec<String>,
    40     pub errors: Vec<String>,
    41 }
    42  
    43 impl ClickHouseClient {
    44     pub fn new(cfg: ClickHouseConfig) -> Result<Self> {
    45         let timeout = Duration::from_secs_f64(cfg.timeout_seconds.max(1.0));
    46         let http = Client::builder()
    47             .timeout(timeout)
    48             .build()
    49             .context("failed to construct reqwest client")?;
    50  
    51         Ok(Self { cfg, http })
    52     }
    53  
    54     pub fn config(&self) -> &ClickHouseConfig {
    55         &self.cfg
    56     }
    57  
    58     fn base_url(&self) -> Result<Url> {
    59         Url::parse(&self.cfg.url).context("invalid ClickHouse URL")
    60     }
    61  
    62     pub async fn request_text(
    63         &self,
    64         query: &str,
    65         body: Option<Vec<u8>>,
    66         database: Option<&str>,
    67         async_insert: bool,
    68         default_format: Option<&str>,
    69     ) -> Result<String> {
    70         let mut url = self.base_url()?;
    71         {
    72             let mut qp = url.query_pairs_mut();
    73             qp.append_pair("query", query);
    74             if let Some(database) = database {
    75                 qp.append_pair("database", database);
    76             }
    77             if let Some(default_format) = default_format {
    78                 qp.append_pair("default_format", default_format);
    79             }
    80             if async_insert && self.cfg.async_insert {
    81                 qp.append_pair("async_insert", "1");
    82                 if self.cfg.wait_for_async_insert {
    83                     qp.append_pair("wait_for_async_insert", "1");
    84                 }
    85             }
    86         }
    87  
    88         // ClickHouse HTTP treats GET as readonly, so use POST for both reads and writes.
    89         let payload = body.unwrap_or_default();
    90         let payload_len = payload.len();
    91  
    92         let mut req = self
    93             .http
    94             .post(url)
    95             .header(CONTENT_TYPE, "text/plain; charset=utf-8")
    96             // Some ClickHouse builds require an explicit Content-Length on POST.
    97             .header(CONTENT_LENGTH, payload_len)
    98             .body(payload);
    99  
   100         if !self.cfg.username.is_empty() {
   101             req = req.basic_auth(self.cfg.username.clone(), Some(self.cfg.password.clone()));
   102         }
   103  
   104         let response = req.send().await.context("clickhouse request failed")?;
   105         let status = response.status();
   106         let text = response.text().await.with_context(|| {
   107             format!(
   108                 "failed to read clickhouse response body (status {})",
   109                 status
   110             )
   111         })?;
   112  
   113         if !status.is_success() {
   114             return Err(anyhow!("clickhouse returned {}: {}", status, text));
   115         }
   116  
   117         Ok(text)
   118     }
   119  
   120     pub async fn ping(&self) -> Result<()> {
   121         let response = self
   122             .request_text("SELECT 1", None, Some("system"), false, None)
   123             .await?;
   124         if response.trim() == "1" {
   125             Ok(())
   126         } else {
   127             Err(anyhow!("unexpected ping response: {}", response.trim()))
   128         }
   129     }
   130  
   131     pub async fn version(&self) -> Result<String> {
   132         let rows: Vec<Value> = self
   133             .query_json_data("SELECT version() AS version", Some("system"))
   134             .await?;
   135         let version = rows
   136             .first()
   137             .and_then(|row| row.get("version"))
   138             .and_then(Value::as_str)
   139             .ok_or_else(|| anyhow!("missing version in payload"))?;
   140  
   141         Ok(version.to_string())
   142     }
   143  
   144     pub async fn query_json_each_row<T: DeserializeOwned>(
   145         &self,
   146         query: &str,
   147         database: Option<&str>,
   148     ) -> Result<Vec<T>> {
   149         let database = database.or(Some(&self.cfg.database));
   150         let raw = self
   151             .request_text(query, None, database, false, None)
   152             .await?;
   153         let mut rows = Vec::new();
   154  
   155         for line in raw.lines() {
   156             if line.trim().is_empty() {
   157                 continue;
   158             }
   159             let row = serde_json::from_str::<T>(line)
   160                 .with_context(|| format!("failed to parse JSONEachRow line: {}", line))?;
   161             rows.push(row);
   162         }
   163  
   164         Ok(rows)
   165     }
   166  
   167     pub async fn query_json_data<T: DeserializeOwned>(
   168         &self,
   169         query: &str,
   170         database: Option<&str>,
   171     ) -> Result<Vec<T>> {
   172         let database = database.or(Some(&self.cfg.database));
   173         let raw = self
   174             .request_text(query, None, database, false, Some("JSON"))
   175             .await?;
   176         let envelope: ClickHouseEnvelope<T> = serde_json::from_str(&raw)
   177             .with_context(|| format!("invalid clickhouse JSON response: {}", raw))?;
   178         Ok(envelope.data)
   179     }
   180  
   181     pub async fn query_rows<T: DeserializeOwned>(
   182         &self,
   183         query: &str,
   184         database: Option<&str>,
   185     ) -> Result<Vec<T>> {
   186         if has_explicit_json_each_row_format(query) {
   187             return self.query_json_each_row(query, database).await;
   188         }
   189  
   190         match self.query_json_data(query, database).await {
   191             Ok(rows) => Ok(rows),
   192             Err(_) => self.query_json_each_row(query, database).await,
   193         }
   194     }
   195  
   196     pub async fn insert_json_rows(&self, table: &str, rows: &[Value]) -> Result<()> {
   197         if rows.is_empty() {
   198             return Ok(());
   199         }
   200  
   201         let mut payload = Vec::<u8>::new();
   202         for row in rows {
   203             let line = serde_json::to_vec(row).context("failed to encode JSON row")?;
   204             payload.extend_from_slice(&line);
   205             payload.push(b'\n');
   206         }
   207  
   208         let query = format!(
   209             "INSERT INTO {}.{} FORMAT JSONEachRow",
   210             escape_identifier(&self.cfg.database),
   211             escape_identifier(table)
   212         );
   213         self.request_text(&query, Some(payload), None, true, None)
   214             .await?;
   215         Ok(())
   216     }
   217  
   218     pub async fn run_migrations(&self) -> Result<Vec<String>> {
   219         validate_identifier(&self.cfg.database)?;
   220  
   221         self.request_text(
   222             &format!(
   223                 "CREATE DATABASE IF NOT EXISTS {}",
   224                 escape_identifier(&self.cfg.database)
   225             ),
   226             None,
   227             None,
   228             false,
   229             None,
   230         )
   231         .await?;
   232  
   233         self.ensure_migration_ledger().await?;
   234         let applied = self.applied_migration_versions().await?;
   235  
   236         let mut executed = Vec::new();
   237         for migration in bundled_migrations() {
   238             if applied.contains(migration.version) {
   239                 continue;
   240             }
   241  
   242             let sql = materialize_migration_sql(migration.sql, &self.cfg.database)?;
   243             for statement in split_sql_statements(&sql) {
   244                 self.request_text(&statement, None, Some(&self.cfg.database), false, None)
   245                     .await
   246                     .with_context(|| {
   247                         format!(
   248                             "failed migration {} statement: {}",
   249                             migration.name,
   250                             truncate_for_error(&statement)
   251                         )
   252                     })?;
   253             }
   254  
   255             let log_stmt = format!(
   256                 "INSERT INTO {}.schema_migrations (version, name) VALUES ({}, {})",
   257                 escape_identifier(&self.cfg.database),
   258                 escape_literal(migration.version),
   259                 escape_literal(migration.name)
   260             );
   261             self.request_text(&log_stmt, None, Some(&self.cfg.database), false, None)
   262                 .await
   263                 .with_context(|| format!("failed to record migration {}", migration.name))?;
   264  
   265             executed.push(migration.version.to_string());
   266         }
   267  
   268         Ok(executed)
   269     }
   270  
   271     pub async fn pending_migration_versions(&self) -> Result<Vec<String>> {
   272         self.ensure_migration_ledger().await?;
   273         let applied = self.applied_migration_versions().await?;
   274         Ok(bundled_migrations()
   275             .into_iter()
   276             .filter(|m| !applied.contains(m.version))
   277             .map(|m| m.version.to_string())
   278             .collect())
   279     }
   280  
   281     pub async fn doctor_report(&self) -> Result<DoctorReport> {
   282         let mut report = DoctorReport {
   283             clickhouse_healthy: false,
   284             clickhouse_version: None,
   285             database: self.cfg.database.clone(),
   286             database_exists: false,
   287             applied_migrations: Vec::new(),
   288             pending_migrations: Vec::new(),
   289             missing_tables: Vec::new(),
   290             errors: Vec::new(),
   291         };
   292  
   293         match self.ping().await {
   294             Ok(()) => {
   295                 report.clickhouse_healthy = true;
   296             }
   297             Err(err) => {
   298                 report.errors.push(format!("ping failed: {err}"));
   299                 return Ok(report);
   300             }
   301         }
   302  
   303         match self.version().await {
   304             Ok(version) => report.clickhouse_version = Some(version),
   305             Err(err) => report.errors.push(format!("version query failed: {err}")),
   306         }
   307  
   308         #[derive(Deserialize)]
   309         struct ExistsRow {
   310             exists: u8,
   311         }
   312  
   313         let exists_query = format!(
   314             "SELECT toUInt8(count() > 0) AS exists FROM system.databases WHERE name = {}",
   315             escape_literal(&self.cfg.database)
   316         );
   317  
   318         match self
   319             .query_json_data::<ExistsRow>(&exists_query, Some("system"))
   320             .await
   321         {
   322             Ok(rows) => {
   323                 report.database_exists = rows.first().map(|r| r.exists == 1).unwrap_or(false)
   324             }
   325             Err(err) => {
   326                 report
   327                     .errors
   328                     .push(format!("database existence query failed: {err}"));
   329                 return Ok(report);
   330             }
   331         }
   332  
   333         if !report.database_exists {
   334             report
   335                 .errors
   336                 .push(format!("database '{}' does not exist", self.cfg.database));
   337             return Ok(report);
   338         }
   339  
   340         match self.applied_migration_versions().await {
   341             Ok(applied) => {
   342                 let mut versions: Vec<String> = applied.into_iter().collect();
   343                 versions.sort();
   344                 report.applied_migrations = versions;
   345             }
   346             Err(err) => report
   347                 .errors
   348                 .push(format!("failed to read migration ledger: {err}")),
   349         }
   350  
   351         let pending = bundled_migrations()
   352             .into_iter()
   353             .filter(|m| !report.applied_migrations.iter().any(|v| v == m.version))
   354             .map(|m| m.version.to_string())
   355             .collect::<Vec<_>>();
   356         report.pending_migrations = pending;
   357  
   358         #[derive(Deserialize)]
   359         struct TableRow {
   360             name: String,
   361         }
   362  
   363         let table_query = format!(
   364             "SELECT name FROM system.tables WHERE database = {}",
   365             escape_literal(&self.cfg.database)
   366         );
   367  
   368         let required = [
   369             "raw_events",
   370             "events",
   371             "event_links",
   372             "tool_io",
   373             "ingest_errors",
   374             "ingest_checkpoints",
   375             "ingest_heartbeats",
   376             "search_documents",
   377             "search_postings",
   378             "search_conversation_terms",
   379             "search_term_stats",
   380             "search_corpus_stats",
   381             "search_query_log",
   382             "search_hit_log",
   383             "search_interaction_log",
   384             "schema_migrations",
   385         ];
   386  
   387         match self
   388             .query_json_data::<TableRow>(&table_query, Some("system"))
   389             .await
   390         {
   391             Ok(rows) => {
   392                 let existing = rows.into_iter().map(|r| r.name).collect::<HashSet<_>>();
   393                 report.missing_tables = required
   394                     .iter()
   395                     .filter(|name| !existing.contains(**name))
   396                     .map(|name| (*name).to_string())
   397                     .collect();
   398             }
   399             Err(err) => report.errors.push(format!("table listing failed: {err}")),
   400         }
   401  
   402         Ok(report)
   403     }
   404  
   405     async fn ensure_migration_ledger(&self) -> Result<()> {
   406         self.request_text(
   407             &format!(
   408                 "CREATE TABLE IF NOT EXISTS {}.schema_migrations (\
   409                  version String, \
   410                  name String, \
   411                  applied_at DateTime64(3) DEFAULT now64(3)\
   412                  ) ENGINE = ReplacingMergeTree(applied_at) \
   413                  ORDER BY (version)",
   414                 escape_identifier(&self.cfg.database)
   415             ),
   416             None,
   417             Some(&self.cfg.database),
   418             false,
   419             None,
   420         )
   421         .await?;
   422  
   423         Ok(())
   424     }
   425  
   426     async fn applied_migration_versions(&self) -> Result<HashSet<String>> {
   427         #[derive(Deserialize)]
   428         struct Row {
   429             version: String,
   430         }
   431  
   432         let query = format!(
   433             "SELECT version FROM {}.schema_migrations GROUP BY version",
   434             escape_identifier(&self.cfg.database)
   435         );
   436  
   437         let rows: Vec<Row> = self
   438             .query_json_data(&query, Some(&self.cfg.database))
   439             .await?;
   440         Ok(rows.into_iter().map(|row| row.version).collect())
   441     }
   442 }
   443  
   444 pub fn bundled_migrations() -> Vec<Migration> {
   445     vec![
   446         Migration {
   447             version: "001",
   448             name: "001_schema.sql",
   449             sql: include_str!("../../../sql/001_schema.sql"),
   450         },
   451         Migration {
   452             version: "002",
   453             name: "002_views.sql",
   454             sql: include_str!("../../../sql/002_views.sql"),
   455         },
   456         Migration {
   457             version: "003",
   458             name: "003_ingest_heartbeats.sql",
   459             sql: include_str!("../../../sql/003_ingest_heartbeats.sql"),
   460         },
   461         Migration {
   462             version: "004",
   463             name: "004_search_index.sql",
   464             sql: include_str!("../../../sql/004_search_index.sql"),
   465         },
   466         Migration {
   467             version: "005",
   468             name: "005_watcher_heartbeat_metrics.sql",
   469             sql: include_str!("../../../sql/005_watcher_heartbeat_metrics.sql"),
   470         },
   471         Migration {
   472             version: "006",
   473             name: "006_search_stats_authoritative_views.sql",
   474             sql: include_str!("../../../sql/006_search_stats_authoritative_views.sql"),
   475         },
   476         Migration {
   477             version: "007",
   478             name: "007_event_links_external_id.sql",
   479             sql: include_str!("../../../sql/007_event_links_external_id.sql"),
   480         },
   481         Migration {
   482             version: "008",
   483             name: "008_categorical_domain_contracts.sql",
   484             sql: include_str!("../../../sql/008_categorical_domain_contracts.sql"),
   485         },
   486         Migration {
   487             version: "009",
   488             name: "009_search_documents_codex_flag.sql",
   489             sql: include_str!("../../../sql/009_search_documents_codex_flag.sql"),
   490         },
   491         Migration {
   492             version: "010",
   493             name: "010_search_conversation_terms.sql",
   494             sql: include_str!("../../../sql/010_search_conversation_terms.sql"),
   495         },
   496     ]
   497 }
   498  
   499 fn truncate_for_error(statement: &str) -> String {
   500     const LIMIT: usize = 240;
   501     let compact = statement.split_whitespace().collect::<Vec<_>>().join(" ");
   502     if compact.len() <= LIMIT {
   503         compact
   504     } else {
   505         let mut boundary = LIMIT;
   506         while !compact.is_char_boundary(boundary) {
   507             boundary -= 1;
   508         }
   509         format!("{}...", &compact[..boundary])
   510     }
   511 }
   512  
   513 fn validate_identifier(identifier: &str) -> Result<()> {
   514     if identifier.is_empty() {
   515         bail!("identifier must not be empty");
   516     }
   517  
   518     let ok = identifier
   519         .chars()
   520         .all(|c| c.is_ascii_alphanumeric() || c == '_');
   521     if !ok {
   522         bail!("identifier contains unsupported characters: {identifier}");
   523     }
   524  
   525     Ok(())
   526 }
   527  
   528 fn materialize_migration_sql(sql: &str, database: &str) -> Result<String> {
   529     validate_identifier(database)?;
   530  
   531     let mut text = sql.to_string();
   532     text = text.replace(
   533         "CREATE DATABASE IF NOT EXISTS moraine;",
   534         &format!("CREATE DATABASE IF NOT EXISTS {database};"),
   535     );
   536     text = text.replace("moraine.", &format!("{database}."));
   537     Ok(text)
   538 }
   539  
   540 fn split_sql_statements(sql: &str) -> Vec<String> {
   541     let mut statements = Vec::new();
   542     let mut current = String::new();
   543     let mut in_single_quote = false;
   544     let mut prev = '\0';
   545  
   546     for line in sql.lines() {
   547         if line.trim_start().starts_with("--") {
   548             continue;
   549         }
   550  
   551         let chars: Vec<char> = line.chars().collect();
   552         let mut idx = 0;
   553         while idx < chars.len() {
   554             let ch = chars[idx];
   555             if ch == '\'' {
   556                 if in_single_quote && idx + 1 < chars.len() && chars[idx + 1] == '\'' {
   557                     current.push(ch);
   558                     current.push(chars[idx + 1]);
   559                     prev = chars[idx + 1];
   560                     idx += 2;
   561                     continue;
   562                 }
   563                 if prev != '\\' {
   564                     in_single_quote = !in_single_quote;
   565                 }
   566             }
   567  
   568             if ch == ';' && !in_single_quote {
   569                 let statement = current.trim();
   570                 if !statement.is_empty() {
   571                     statements.push(statement.to_string());
   572                 }
   573                 current.clear();
   574                 prev = '\0';
   575                 idx += 1;
   576                 continue;
   577             }
   578  
   579             current.push(ch);
   580             prev = ch;
   581             idx += 1;
   582         }
   583  
   584         current.push('\n');
   585     }
   586  
   587     let tail = current.trim();
   588     if !tail.is_empty() {
   589         statements.push(tail.to_string());
   590     }
   591  
   592     statements
   593 }
   594  
   595 fn escape_identifier(identifier: &str) -> String {
   596     format!("`{}`", identifier.replace('`', "``"))
   597 }
   598  
   599 fn escape_literal(value: &str) -> String {
   600     format!("'{}'", value.replace('\\', "\\\\").replace('\'', "\\'"))
   601 }
   602  
   603 fn has_explicit_json_each_row_format(query: &str) -> bool {
   604     let compact = query
   605         .split_whitespace()
   606         .collect::<Vec<_>>()
   607         .join(" ")
   608         .to_ascii_lowercase();
   609     compact.contains(" format jsoneachrow")
   610 }
   611  
   612 #[cfg(test)]
   613 mod tests {
   614     use super::*;
   615     use axum::{
   616         extract::Query,
   617         http::{HeaderMap, StatusCode},
   618         routing::get,
   619         Router,
   620     };
   621     use moraine_config::ClickHouseConfig;
   622     use serde::Deserialize;
   623     use std::collections::HashMap;
   624  
   625     fn test_clickhouse_config(url: String) -> ClickHouseConfig {
   626         ClickHouseConfig {
   627             url,
   628             database: "moraine".to_string(),
   629             username: "default".to_string(),
   630             password: String::new(),
   631             timeout_seconds: 5.0,
   632             async_insert: true,
   633             wait_for_async_insert: true,
   634         }
   635     }
   636  
   637     async fn spawn_mock_server() -> String {
   638         async fn handler(
   639             Query(params): Query<HashMap<String, String>>,
   640             headers: HeaderMap,
   641         ) -> (StatusCode, String) {
   642             if headers.get("content-length").is_none() {
   643                 return (
   644                     StatusCode::LENGTH_REQUIRED,
   645                     "missing content-length".to_string(),
   646                 );
   647             }
   648  
   649             let query = params.get("query").cloned().unwrap_or_default();
   650             if query.contains("FAIL") {
   651                 return (StatusCode::INTERNAL_SERVER_ERROR, "boom".to_string());
   652             }
   653  
   654             if params
   655                 .get("default_format")
   656                 .is_some_and(|fmt| fmt == "JSON")
   657             {
   658                 return (StatusCode::OK, "not-json".to_string());
   659             }
   660  
   661             (StatusCode::OK, "{\"value\":7}\n".to_string())
   662         }
   663  
   664         let app = Router::new().route("/", get(handler).post(handler));
   665         let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
   666             .await
   667             .expect("bind test listener");
   668         let addr = listener.local_addr().expect("listener addr");
   669  
   670         tokio::spawn(async move {
   671             let _ = axum::serve(listener, app).await;
   672         });
   673  
   674         format!("http://{}", addr)
   675     }
   676  
   677     fn spawn_truncated_body_server() -> String {
   678         use std::io::{Read, Write};
   679         use std::net::TcpListener;
   680  
   681         let listener = TcpListener::bind("127.0.0.1:0").expect("bind raw listener");
   682         let addr = listener.local_addr().expect("raw listener addr");
   683  
   684         std::thread::spawn(move || {
   685             if let Ok((mut stream, _)) = listener.accept() {
   686                 let mut request = [0_u8; 4096];
   687                 let _ = stream.read(&mut request);
   688  
   689                 let response = concat!(
   690                     "HTTP/1.1 200 OK\r\n",
   691                     "Content-Type: text/plain; charset=utf-8\r\n",
   692                     "Content-Length: 20\r\n",
   693                     "Connection: close\r\n",
   694                     "\r\n",
   695                     "short",
   696                 );
   697                 let _ = stream.write_all(response.as_bytes());
   698                 let _ = stream.flush();
   699             }
   700         });
   701  
   702         format!("http://{}", addr)
   703     }
   704  
   705     #[test]
   706     fn sql_split_handles_multiple_statements() {
   707         let sql = "CREATE TABLE a (x String);\nINSERT INTO a VALUES ('a;b');\n";
   708         let out = split_sql_statements(sql);
   709         assert_eq!(out.len(), 2);
   710         assert!(out[0].starts_with("CREATE TABLE"));
   711         assert!(out[1].contains("'a;b'"));
   712     }
   713  
   714     #[test]
   715     fn sql_split_handles_sql_standard_escaped_quotes() {
   716         let sql = "INSERT INTO a VALUES ('it''s;fine');\nSELECT 1;\n";
   717         let out = split_sql_statements(sql);
   718         assert_eq!(out.len(), 2);
   719         assert!(out[0].contains("'it''s;fine'"));
   720     }
   721  
   722     #[test]
   723     fn sql_split_handles_escaped_quote_after_backslash() {
   724         let sql = "INSERT INTO a VALUES ('path\\'';still-string');\nSELECT 1;\n";
   725         let out = split_sql_statements(sql);
   726         assert_eq!(
   727             out,
   728             vec![
   729                 "INSERT INTO a VALUES ('path\\'';still-string')".to_string(),
   730                 "SELECT 1".to_string()
   731             ]
   732         );
   733     }
   734  
   735     #[test]
   736     fn sql_materialization_rewrites_database() {
   737         let sql = "CREATE DATABASE IF NOT EXISTS moraine;\nCREATE TABLE moraine.events (x UInt8);";
   738         let out = materialize_migration_sql(sql, "custom_db").expect("should rewrite");
   739         assert!(out.contains("CREATE DATABASE IF NOT EXISTS custom_db;"));
   740         assert!(out.contains("custom_db.events"));
   741     }
   742  
   743     #[test]
   744     fn identifier_validation_rejects_invalid() {
   745         assert!(validate_identifier("moraine_01").is_ok());
   746         assert!(validate_identifier("moraine-db").is_err());
   747     }
   748  
   749     #[test]
   750     fn format_detection_handles_case_and_whitespace() {
   751         assert!(has_explicit_json_each_row_format(
   752             "SELECT 1\nFORMAT JSONEachRow"
   753         ));
   754         assert!(has_explicit_json_each_row_format(
   755             "SELECT 1 format jsoneachrow"
   756         ));
   757         assert!(!has_explicit_json_each_row_format("SELECT 1"));
   758         assert!(!has_explicit_json_each_row_format("SELECT 1 FORMAT JSON"));
   759     }
   760  
   761     #[test]
   762     fn truncate_for_error_handles_multibyte_utf8_boundaries() {
   763         let statement = format!("{}é{}", "a".repeat(239), "b".repeat(10));
   764         let truncated = truncate_for_error(&statement);
   765         assert_eq!(truncated, format!("{}...", "a".repeat(239)));
   766     }
   767  
   768     #[tokio::test(flavor = "multi_thread")]
   769     async fn query_rows_falls_back_to_json_each_row() {
   770         #[derive(Deserialize)]
   771         struct Row {
   772             value: u8,
   773         }
   774  
   775         let base_url = spawn_mock_server().await;
   776         let client = ClickHouseClient::new(test_clickhouse_config(base_url)).expect("new client");
   777  
   778         let rows: Vec<Row> = client
   779             .query_rows("SELECT 7 AS value", None)
   780             .await
   781             .expect("fallback query_rows");
   782         assert_eq!(rows.len(), 1);
   783         assert_eq!(rows[0].value, 7);
   784     }
   785  
   786     #[tokio::test(flavor = "multi_thread")]
   787     async fn request_text_includes_status_and_body_on_http_failure() {
   788         let base_url = spawn_mock_server().await;
   789         let client = ClickHouseClient::new(test_clickhouse_config(base_url)).expect("new client");
   790  
   791         let err = client
   792             .request_text("SELECT FAIL", None, None, false, None)
   793             .await
   794             .expect_err("expected HTTP failure");
   795  
   796         let msg = err.to_string();
   797         assert!(msg.contains("clickhouse returned"));
   798         assert!(msg.contains("500"));
   799         assert!(msg.contains("boom"));
   800     }
   801  
   802     #[tokio::test(flavor = "multi_thread")]
   803     async fn request_text_propagates_response_body_read_errors() {
   804         let base_url = spawn_truncated_body_server();
   805         let client = ClickHouseClient::new(test_clickhouse_config(base_url)).expect("new client");
   806  
   807         let err = client
   808             .request_text("SELECT 1", None, None, false, None)
   809             .await
   810             .expect_err("expected response body read failure");
   811  
   812         let msg = err.to_string();
   813         assert!(msg.contains("failed to read clickhouse response body"));
   814     }
   815 }