crates/moraine-clickhouse/src/lib.rs
1 use anyhow::{anyhow, bail, Context, Result};
2 use moraine_config::ClickHouseConfig;
3 use reqwest::{
4 header::{CONTENT_LENGTH, CONTENT_TYPE},
5 Client, Url,
6 };
7 use serde::de::DeserializeOwned;
8 use serde::Deserialize;
9 use serde_json::Value;
10 use std::collections::HashSet;
11 use std::time::Duration;
12
13 #[derive(Clone)]
14 pub struct ClickHouseClient {
15 cfg: ClickHouseConfig,
16 http: Client,
17 }
18
19 #[derive(Deserialize)]
20 struct ClickHouseEnvelope<T> {
21 data: Vec<T>,
22 }
23
24 #[derive(Debug, Clone)]
25 pub struct Migration {
26 pub version: &'static str,
27 pub name: &'static str,
28 pub sql: &'static str,
29 }
30
31 #[derive(Debug, Clone, serde::Serialize)]
32 pub struct DoctorReport {
33 pub clickhouse_healthy: bool,
34 pub clickhouse_version: Option<String>,
35 pub database: String,
36 pub database_exists: bool,
37 pub applied_migrations: Vec<String>,
38 pub pending_migrations: Vec<String>,
39 pub missing_tables: Vec<String>,
40 pub errors: Vec<String>,
41 }
42
43 impl ClickHouseClient {
44 pub fn new(cfg: ClickHouseConfig) -> Result<Self> {
45 let timeout = Duration::from_secs_f64(cfg.timeout_seconds.max(1.0));
46 let http = Client::builder()
47 .timeout(timeout)
48 .build()
49 .context("failed to construct reqwest client")?;
50
51 Ok(Self { cfg, http })
52 }
53
54 pub fn config(&self) -> &ClickHouseConfig {
55 &self.cfg
56 }
57
58 fn base_url(&self) -> Result<Url> {
59 Url::parse(&self.cfg.url).context("invalid ClickHouse URL")
60 }
61
62 pub async fn request_text(
63 &self,
64 query: &str,
65 body: Option<Vec<u8>>,
66 database: Option<&str>,
67 async_insert: bool,
68 default_format: Option<&str>,
69 ) -> Result<String> {
70 let mut url = self.base_url()?;
71 {
72 let mut qp = url.query_pairs_mut();
73 qp.append_pair("query", query);
74 if let Some(database) = database {
75 qp.append_pair("database", database);
76 }
77 if let Some(default_format) = default_format {
78 qp.append_pair("default_format", default_format);
79 }
80 if async_insert && self.cfg.async_insert {
81 qp.append_pair("async_insert", "1");
82 if self.cfg.wait_for_async_insert {
83 qp.append_pair("wait_for_async_insert", "1");
84 }
85 }
86 }
87
88 // ClickHouse HTTP treats GET as readonly, so use POST for both reads and writes.
89 let payload = body.unwrap_or_default();
90 let payload_len = payload.len();
91
92 let mut req = self
93 .http
94 .post(url)
95 .header(CONTENT_TYPE, "text/plain; charset=utf-8")
96 // Some ClickHouse builds require an explicit Content-Length on POST.
97 .header(CONTENT_LENGTH, payload_len)
98 .body(payload);
99
100 if !self.cfg.username.is_empty() {
101 req = req.basic_auth(self.cfg.username.clone(), Some(self.cfg.password.clone()));
102 }
103
104 let response = req.send().await.context("clickhouse request failed")?;
105 let status = response.status();
106 let text = response.text().await.with_context(|| {
107 format!(
108 "failed to read clickhouse response body (status {})",
109 status
110 )
111 })?;
112
113 if !status.is_success() {
114 return Err(anyhow!("clickhouse returned {}: {}", status, text));
115 }
116
117 Ok(text)
118 }
119
120 pub async fn ping(&self) -> Result<()> {
121 let response = self
122 .request_text("SELECT 1", None, Some("system"), false, None)
123 .await?;
124 if response.trim() == "1" {
125 Ok(())
126 } else {
127 Err(anyhow!("unexpected ping response: {}", response.trim()))
128 }
129 }
130
131 pub async fn version(&self) -> Result<String> {
132 let rows: Vec<Value> = self
133 .query_json_data("SELECT version() AS version", Some("system"))
134 .await?;
135 let version = rows
136 .first()
137 .and_then(|row| row.get("version"))
138 .and_then(Value::as_str)
139 .ok_or_else(|| anyhow!("missing version in payload"))?;
140
141 Ok(version.to_string())
142 }
143
144 pub async fn query_json_each_row<T: DeserializeOwned>(
145 &self,
146 query: &str,
147 database: Option<&str>,
148 ) -> Result<Vec<T>> {
149 let database = database.or(Some(&self.cfg.database));
150 let raw = self
151 .request_text(query, None, database, false, None)
152 .await?;
153 let mut rows = Vec::new();
154
155 for line in raw.lines() {
156 if line.trim().is_empty() {
157 continue;
158 }
159 let row = serde_json::from_str::<T>(line)
160 .with_context(|| format!("failed to parse JSONEachRow line: {}", line))?;
161 rows.push(row);
162 }
163
164 Ok(rows)
165 }
166
167 pub async fn query_json_data<T: DeserializeOwned>(
168 &self,
169 query: &str,
170 database: Option<&str>,
171 ) -> Result<Vec<T>> {
172 let database = database.or(Some(&self.cfg.database));
173 let raw = self
174 .request_text(query, None, database, false, Some("JSON"))
175 .await?;
176 let envelope: ClickHouseEnvelope<T> = serde_json::from_str(&raw)
177 .with_context(|| format!("invalid clickhouse JSON response: {}", raw))?;
178 Ok(envelope.data)
179 }
180
181 pub async fn query_rows<T: DeserializeOwned>(
182 &self,
183 query: &str,
184 database: Option<&str>,
185 ) -> Result<Vec<T>> {
186 if has_explicit_json_each_row_format(query) {
187 return self.query_json_each_row(query, database).await;
188 }
189
190 match self.query_json_data(query, database).await {
191 Ok(rows) => Ok(rows),
192 Err(_) => self.query_json_each_row(query, database).await,
193 }
194 }
195
196 pub async fn insert_json_rows(&self, table: &str, rows: &[Value]) -> Result<()> {
197 if rows.is_empty() {
198 return Ok(());
199 }
200
201 let mut payload = Vec::<u8>::new();
202 for row in rows {
203 let line = serde_json::to_vec(row).context("failed to encode JSON row")?;
204 payload.extend_from_slice(&line);
205 payload.push(b'\n');
206 }
207
208 let query = format!(
209 "INSERT INTO {}.{} FORMAT JSONEachRow",
210 escape_identifier(&self.cfg.database),
211 escape_identifier(table)
212 );
213 self.request_text(&query, Some(payload), None, true, None)
214 .await?;
215 Ok(())
216 }
217
218 pub async fn run_migrations(&self) -> Result<Vec<String>> {
219 validate_identifier(&self.cfg.database)?;
220
221 self.request_text(
222 &format!(
223 "CREATE DATABASE IF NOT EXISTS {}",
224 escape_identifier(&self.cfg.database)
225 ),
226 None,
227 None,
228 false,
229 None,
230 )
231 .await?;
232
233 self.ensure_migration_ledger().await?;
234 let applied = self.applied_migration_versions().await?;
235
236 let mut executed = Vec::new();
237 for migration in bundled_migrations() {
238 if applied.contains(migration.version) {
239 continue;
240 }
241
242 let sql = materialize_migration_sql(migration.sql, &self.cfg.database)?;
243 for statement in split_sql_statements(&sql) {
244 self.request_text(&statement, None, Some(&self.cfg.database), false, None)
245 .await
246 .with_context(|| {
247 format!(
248 "failed migration {} statement: {}",
249 migration.name,
250 truncate_for_error(&statement)
251 )
252 })?;
253 }
254
255 let log_stmt = format!(
256 "INSERT INTO {}.schema_migrations (version, name) VALUES ({}, {})",
257 escape_identifier(&self.cfg.database),
258 escape_literal(migration.version),
259 escape_literal(migration.name)
260 );
261 self.request_text(&log_stmt, None, Some(&self.cfg.database), false, None)
262 .await
263 .with_context(|| format!("failed to record migration {}", migration.name))?;
264
265 executed.push(migration.version.to_string());
266 }
267
268 Ok(executed)
269 }
270
271 pub async fn pending_migration_versions(&self) -> Result<Vec<String>> {
272 self.ensure_migration_ledger().await?;
273 let applied = self.applied_migration_versions().await?;
274 Ok(bundled_migrations()
275 .into_iter()
276 .filter(|m| !applied.contains(m.version))
277 .map(|m| m.version.to_string())
278 .collect())
279 }
280
281 pub async fn doctor_report(&self) -> Result<DoctorReport> {
282 let mut report = DoctorReport {
283 clickhouse_healthy: false,
284 clickhouse_version: None,
285 database: self.cfg.database.clone(),
286 database_exists: false,
287 applied_migrations: Vec::new(),
288 pending_migrations: Vec::new(),
289 missing_tables: Vec::new(),
290 errors: Vec::new(),
291 };
292
293 match self.ping().await {
294 Ok(()) => {
295 report.clickhouse_healthy = true;
296 }
297 Err(err) => {
298 report.errors.push(format!("ping failed: {err}"));
299 return Ok(report);
300 }
301 }
302
303 match self.version().await {
304 Ok(version) => report.clickhouse_version = Some(version),
305 Err(err) => report.errors.push(format!("version query failed: {err}")),
306 }
307
308 #[derive(Deserialize)]
309 struct ExistsRow {
310 exists: u8,
311 }
312
313 let exists_query = format!(
314 "SELECT toUInt8(count() > 0) AS exists FROM system.databases WHERE name = {}",
315 escape_literal(&self.cfg.database)
316 );
317
318 match self
319 .query_json_data::<ExistsRow>(&exists_query, Some("system"))
320 .await
321 {
322 Ok(rows) => {
323 report.database_exists = rows.first().map(|r| r.exists == 1).unwrap_or(false)
324 }
325 Err(err) => {
326 report
327 .errors
328 .push(format!("database existence query failed: {err}"));
329 return Ok(report);
330 }
331 }
332
333 if !report.database_exists {
334 report
335 .errors
336 .push(format!("database '{}' does not exist", self.cfg.database));
337 return Ok(report);
338 }
339
340 match self.applied_migration_versions().await {
341 Ok(applied) => {
342 let mut versions: Vec<String> = applied.into_iter().collect();
343 versions.sort();
344 report.applied_migrations = versions;
345 }
346 Err(err) => report
347 .errors
348 .push(format!("failed to read migration ledger: {err}")),
349 }
350
351 let pending = bundled_migrations()
352 .into_iter()
353 .filter(|m| !report.applied_migrations.iter().any(|v| v == m.version))
354 .map(|m| m.version.to_string())
355 .collect::<Vec<_>>();
356 report.pending_migrations = pending;
357
358 #[derive(Deserialize)]
359 struct TableRow {
360 name: String,
361 }
362
363 let table_query = format!(
364 "SELECT name FROM system.tables WHERE database = {}",
365 escape_literal(&self.cfg.database)
366 );
367
368 let required = [
369 "raw_events",
370 "events",
371 "event_links",
372 "tool_io",
373 "ingest_errors",
374 "ingest_checkpoints",
375 "ingest_heartbeats",
376 "search_documents",
377 "search_postings",
378 "search_conversation_terms",
379 "search_term_stats",
380 "search_corpus_stats",
381 "search_query_log",
382 "search_hit_log",
383 "search_interaction_log",
384 "schema_migrations",
385 ];
386
387 match self
388 .query_json_data::<TableRow>(&table_query, Some("system"))
389 .await
390 {
391 Ok(rows) => {
392 let existing = rows.into_iter().map(|r| r.name).collect::<HashSet<_>>();
393 report.missing_tables = required
394 .iter()
395 .filter(|name| !existing.contains(**name))
396 .map(|name| (*name).to_string())
397 .collect();
398 }
399 Err(err) => report.errors.push(format!("table listing failed: {err}")),
400 }
401
402 Ok(report)
403 }
404
405 async fn ensure_migration_ledger(&self) -> Result<()> {
406 self.request_text(
407 &format!(
408 "CREATE TABLE IF NOT EXISTS {}.schema_migrations (\
409 version String, \
410 name String, \
411 applied_at DateTime64(3) DEFAULT now64(3)\
412 ) ENGINE = ReplacingMergeTree(applied_at) \
413 ORDER BY (version)",
414 escape_identifier(&self.cfg.database)
415 ),
416 None,
417 Some(&self.cfg.database),
418 false,
419 None,
420 )
421 .await?;
422
423 Ok(())
424 }
425
426 async fn applied_migration_versions(&self) -> Result<HashSet<String>> {
427 #[derive(Deserialize)]
428 struct Row {
429 version: String,
430 }
431
432 let query = format!(
433 "SELECT version FROM {}.schema_migrations GROUP BY version",
434 escape_identifier(&self.cfg.database)
435 );
436
437 let rows: Vec<Row> = self
438 .query_json_data(&query, Some(&self.cfg.database))
439 .await?;
440 Ok(rows.into_iter().map(|row| row.version).collect())
441 }
442 }
443
444 pub fn bundled_migrations() -> Vec<Migration> {
445 vec![
446 Migration {
447 version: "001",
448 name: "001_schema.sql",
449 sql: include_str!("../../../sql/001_schema.sql"),
450 },
451 Migration {
452 version: "002",
453 name: "002_views.sql",
454 sql: include_str!("../../../sql/002_views.sql"),
455 },
456 Migration {
457 version: "003",
458 name: "003_ingest_heartbeats.sql",
459 sql: include_str!("../../../sql/003_ingest_heartbeats.sql"),
460 },
461 Migration {
462 version: "004",
463 name: "004_search_index.sql",
464 sql: include_str!("../../../sql/004_search_index.sql"),
465 },
466 Migration {
467 version: "005",
468 name: "005_watcher_heartbeat_metrics.sql",
469 sql: include_str!("../../../sql/005_watcher_heartbeat_metrics.sql"),
470 },
471 Migration {
472 version: "006",
473 name: "006_search_stats_authoritative_views.sql",
474 sql: include_str!("../../../sql/006_search_stats_authoritative_views.sql"),
475 },
476 Migration {
477 version: "007",
478 name: "007_event_links_external_id.sql",
479 sql: include_str!("../../../sql/007_event_links_external_id.sql"),
480 },
481 Migration {
482 version: "008",
483 name: "008_categorical_domain_contracts.sql",
484 sql: include_str!("../../../sql/008_categorical_domain_contracts.sql"),
485 },
486 Migration {
487 version: "009",
488 name: "009_search_documents_codex_flag.sql",
489 sql: include_str!("../../../sql/009_search_documents_codex_flag.sql"),
490 },
491 Migration {
492 version: "010",
493 name: "010_search_conversation_terms.sql",
494 sql: include_str!("../../../sql/010_search_conversation_terms.sql"),
495 },
496 ]
497 }
498
499 fn truncate_for_error(statement: &str) -> String {
500 const LIMIT: usize = 240;
501 let compact = statement.split_whitespace().collect::<Vec<_>>().join(" ");
502 if compact.len() <= LIMIT {
503 compact
504 } else {
505 let mut boundary = LIMIT;
506 while !compact.is_char_boundary(boundary) {
507 boundary -= 1;
508 }
509 format!("{}...", &compact[..boundary])
510 }
511 }
512
513 fn validate_identifier(identifier: &str) -> Result<()> {
514 if identifier.is_empty() {
515 bail!("identifier must not be empty");
516 }
517
518 let ok = identifier
519 .chars()
520 .all(|c| c.is_ascii_alphanumeric() || c == '_');
521 if !ok {
522 bail!("identifier contains unsupported characters: {identifier}");
523 }
524
525 Ok(())
526 }
527
528 fn materialize_migration_sql(sql: &str, database: &str) -> Result<String> {
529 validate_identifier(database)?;
530
531 let mut text = sql.to_string();
532 text = text.replace(
533 "CREATE DATABASE IF NOT EXISTS moraine;",
534 &format!("CREATE DATABASE IF NOT EXISTS {database};"),
535 );
536 text = text.replace("moraine.", &format!("{database}."));
537 Ok(text)
538 }
539
540 fn split_sql_statements(sql: &str) -> Vec<String> {
541 let mut statements = Vec::new();
542 let mut current = String::new();
543 let mut in_single_quote = false;
544 let mut prev = '\0';
545
546 for line in sql.lines() {
547 if line.trim_start().starts_with("--") {
548 continue;
549 }
550
551 let chars: Vec<char> = line.chars().collect();
552 let mut idx = 0;
553 while idx < chars.len() {
554 let ch = chars[idx];
555 if ch == '\'' {
556 if in_single_quote && idx + 1 < chars.len() && chars[idx + 1] == '\'' {
557 current.push(ch);
558 current.push(chars[idx + 1]);
559 prev = chars[idx + 1];
560 idx += 2;
561 continue;
562 }
563 if prev != '\\' {
564 in_single_quote = !in_single_quote;
565 }
566 }
567
568 if ch == ';' && !in_single_quote {
569 let statement = current.trim();
570 if !statement.is_empty() {
571 statements.push(statement.to_string());
572 }
573 current.clear();
574 prev = '\0';
575 idx += 1;
576 continue;
577 }
578
579 current.push(ch);
580 prev = ch;
581 idx += 1;
582 }
583
584 current.push('\n');
585 }
586
587 let tail = current.trim();
588 if !tail.is_empty() {
589 statements.push(tail.to_string());
590 }
591
592 statements
593 }
594
595 fn escape_identifier(identifier: &str) -> String {
596 format!("`{}`", identifier.replace('`', "``"))
597 }
598
599 fn escape_literal(value: &str) -> String {
600 format!("'{}'", value.replace('\\', "\\\\").replace('\'', "\\'"))
601 }
602
603 fn has_explicit_json_each_row_format(query: &str) -> bool {
604 let compact = query
605 .split_whitespace()
606 .collect::<Vec<_>>()
607 .join(" ")
608 .to_ascii_lowercase();
609 compact.contains(" format jsoneachrow")
610 }
611
612 #[cfg(test)]
613 mod tests {
614 use super::*;
615 use axum::{
616 extract::Query,
617 http::{HeaderMap, StatusCode},
618 routing::get,
619 Router,
620 };
621 use moraine_config::ClickHouseConfig;
622 use serde::Deserialize;
623 use std::collections::HashMap;
624
625 fn test_clickhouse_config(url: String) -> ClickHouseConfig {
626 ClickHouseConfig {
627 url,
628 database: "moraine".to_string(),
629 username: "default".to_string(),
630 password: String::new(),
631 timeout_seconds: 5.0,
632 async_insert: true,
633 wait_for_async_insert: true,
634 }
635 }
636
637 async fn spawn_mock_server() -> String {
638 async fn handler(
639 Query(params): Query<HashMap<String, String>>,
640 headers: HeaderMap,
641 ) -> (StatusCode, String) {
642 if headers.get("content-length").is_none() {
643 return (
644 StatusCode::LENGTH_REQUIRED,
645 "missing content-length".to_string(),
646 );
647 }
648
649 let query = params.get("query").cloned().unwrap_or_default();
650 if query.contains("FAIL") {
651 return (StatusCode::INTERNAL_SERVER_ERROR, "boom".to_string());
652 }
653
654 if params
655 .get("default_format")
656 .is_some_and(|fmt| fmt == "JSON")
657 {
658 return (StatusCode::OK, "not-json".to_string());
659 }
660
661 (StatusCode::OK, "{\"value\":7}\n".to_string())
662 }
663
664 let app = Router::new().route("/", get(handler).post(handler));
665 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
666 .await
667 .expect("bind test listener");
668 let addr = listener.local_addr().expect("listener addr");
669
670 tokio::spawn(async move {
671 let _ = axum::serve(listener, app).await;
672 });
673
674 format!("http://{}", addr)
675 }
676
677 fn spawn_truncated_body_server() -> String {
678 use std::io::{Read, Write};
679 use std::net::TcpListener;
680
681 let listener = TcpListener::bind("127.0.0.1:0").expect("bind raw listener");
682 let addr = listener.local_addr().expect("raw listener addr");
683
684 std::thread::spawn(move || {
685 if let Ok((mut stream, _)) = listener.accept() {
686 let mut request = [0_u8; 4096];
687 let _ = stream.read(&mut request);
688
689 let response = concat!(
690 "HTTP/1.1 200 OK\r\n",
691 "Content-Type: text/plain; charset=utf-8\r\n",
692 "Content-Length: 20\r\n",
693 "Connection: close\r\n",
694 "\r\n",
695 "short",
696 );
697 let _ = stream.write_all(response.as_bytes());
698 let _ = stream.flush();
699 }
700 });
701
702 format!("http://{}", addr)
703 }
704
705 #[test]
706 fn sql_split_handles_multiple_statements() {
707 let sql = "CREATE TABLE a (x String);\nINSERT INTO a VALUES ('a;b');\n";
708 let out = split_sql_statements(sql);
709 assert_eq!(out.len(), 2);
710 assert!(out[0].starts_with("CREATE TABLE"));
711 assert!(out[1].contains("'a;b'"));
712 }
713
714 #[test]
715 fn sql_split_handles_sql_standard_escaped_quotes() {
716 let sql = "INSERT INTO a VALUES ('it''s;fine');\nSELECT 1;\n";
717 let out = split_sql_statements(sql);
718 assert_eq!(out.len(), 2);
719 assert!(out[0].contains("'it''s;fine'"));
720 }
721
722 #[test]
723 fn sql_split_handles_escaped_quote_after_backslash() {
724 let sql = "INSERT INTO a VALUES ('path\\'';still-string');\nSELECT 1;\n";
725 let out = split_sql_statements(sql);
726 assert_eq!(
727 out,
728 vec![
729 "INSERT INTO a VALUES ('path\\'';still-string')".to_string(),
730 "SELECT 1".to_string()
731 ]
732 );
733 }
734
735 #[test]
736 fn sql_materialization_rewrites_database() {
737 let sql = "CREATE DATABASE IF NOT EXISTS moraine;\nCREATE TABLE moraine.events (x UInt8);";
738 let out = materialize_migration_sql(sql, "custom_db").expect("should rewrite");
739 assert!(out.contains("CREATE DATABASE IF NOT EXISTS custom_db;"));
740 assert!(out.contains("custom_db.events"));
741 }
742
743 #[test]
744 fn identifier_validation_rejects_invalid() {
745 assert!(validate_identifier("moraine_01").is_ok());
746 assert!(validate_identifier("moraine-db").is_err());
747 }
748
749 #[test]
750 fn format_detection_handles_case_and_whitespace() {
751 assert!(has_explicit_json_each_row_format(
752 "SELECT 1\nFORMAT JSONEachRow"
753 ));
754 assert!(has_explicit_json_each_row_format(
755 "SELECT 1 format jsoneachrow"
756 ));
757 assert!(!has_explicit_json_each_row_format("SELECT 1"));
758 assert!(!has_explicit_json_each_row_format("SELECT 1 FORMAT JSON"));
759 }
760
761 #[test]
762 fn truncate_for_error_handles_multibyte_utf8_boundaries() {
763 let statement = format!("{}é{}", "a".repeat(239), "b".repeat(10));
764 let truncated = truncate_for_error(&statement);
765 assert_eq!(truncated, format!("{}...", "a".repeat(239)));
766 }
767
768 #[tokio::test(flavor = "multi_thread")]
769 async fn query_rows_falls_back_to_json_each_row() {
770 #[derive(Deserialize)]
771 struct Row {
772 value: u8,
773 }
774
775 let base_url = spawn_mock_server().await;
776 let client = ClickHouseClient::new(test_clickhouse_config(base_url)).expect("new client");
777
778 let rows: Vec<Row> = client
779 .query_rows("SELECT 7 AS value", None)
780 .await
781 .expect("fallback query_rows");
782 assert_eq!(rows.len(), 1);
783 assert_eq!(rows[0].value, 7);
784 }
785
786 #[tokio::test(flavor = "multi_thread")]
787 async fn request_text_includes_status_and_body_on_http_failure() {
788 let base_url = spawn_mock_server().await;
789 let client = ClickHouseClient::new(test_clickhouse_config(base_url)).expect("new client");
790
791 let err = client
792 .request_text("SELECT FAIL", None, None, false, None)
793 .await
794 .expect_err("expected HTTP failure");
795
796 let msg = err.to_string();
797 assert!(msg.contains("clickhouse returned"));
798 assert!(msg.contains("500"));
799 assert!(msg.contains("boom"));
800 }
801
802 #[tokio::test(flavor = "multi_thread")]
803 async fn request_text_propagates_response_body_read_errors() {
804 let base_url = spawn_truncated_body_server();
805 let client = ClickHouseClient::new(test_clickhouse_config(base_url)).expect("new client");
806
807 let err = client
808 .request_text("SELECT 1", None, None, false, None)
809 .await
810 .expect_err("expected response body read failure");
811
812 let msg = err.to_string();
813 assert!(msg.contains("failed to read clickhouse response body"));
814 }
815 }