diff --git a/Cargo.lock b/Cargo.lock index 71c4edadf2..a9143cbaf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3865,6 +3865,7 @@ dependencies = [ "strum", "tempfile", "tokio", + "tracing", ] [[package]] diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index c63e3061bd..9782641fc5 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -34,6 +34,7 @@ async-trait = { workspace = true } iceberg = { workspace = true } sqlx = { version = "0.8.1", features = ["any"], default-features = false } strum = { workspace = true } +tracing = { workspace = true } [dev-dependencies] itertools = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c7bf9d0cfd..0f12043a67 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -40,7 +40,13 @@ pub const SQL_CATALOG_PROP_URI: &str = "uri"; /// catalog warehouse location pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; /// catalog sql bind style -pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style"; +pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql.bind-style"; +/// Legacy (pre-`sql.bind-style`) key for [`SQL_CATALOG_PROP_BIND_STYLE`], still accepted for +/// backward compatibility. +const SQL_CATALOG_PROP_BIND_STYLE_LEGACY: &str = "sql_bind_style"; +/// Expected catalog schema version. +/// If set to `V1` while the catalog table is actually `V0`, it will be migrated from `V0` to `V1`. +pub const SQL_CATALOG_PROP_SCHEMA_VERSION: &str = "sql.schema-version"; static CATALOG_TABLE_NAME: &str = "iceberg_tables"; static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; @@ -78,6 +84,7 @@ impl Default for SqlCatalogBuilder { name: "".to_string(), warehouse_location: "".to_string(), sql_bind_style: SqlBindStyle::DollarNumeric, + schema_version: None, props: HashMap::new(), }, storage_factory: None, @@ -169,7 +176,16 @@ impl CatalogBuilder for SqlCatalogBuilder { let name = name.into(); let mut valid_sql_bind_style = true; - if let Some(sql_bind_style) = self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE) { + + // Accept the preferred `sql.bind-style` key, falling back to the legacy `sql_bind_style`. + let sql_bind_style = self + .config + .props + .remove(SQL_CATALOG_PROP_BIND_STYLE) + .or_else(|| self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE_LEGACY)); + + // Validate the SQL bind style + if let Some(sql_bind_style) = sql_bind_style { if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) { self.config.sql_bind_style = sql_bind_style; } else { @@ -177,6 +193,16 @@ impl CatalogBuilder for SqlCatalogBuilder { } } + // Parse the requested schema version up front so invalid values fail fast rather than + // silently falling back to V0. + let mut valid_schema_version = true; + if let Some(schema_version) = self.config.props.remove(SQL_CATALOG_PROP_SCHEMA_VERSION) { + match SchemaVersion::from_str(&schema_version) { + Ok(schema_version) => self.config.schema_version = Some(schema_version), + Err(_) => valid_schema_version = false, + } + } + let valid_name = !name.trim().is_empty(); async move { @@ -195,6 +221,16 @@ impl CatalogBuilder for SqlCatalogBuilder { SqlBindStyle::QMark ), )) + } else if !valid_schema_version { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "`{}` values are valid only if they're `{}` or `{}`", + SQL_CATALOG_PROP_SCHEMA_VERSION, + SchemaVersion::V0, + SchemaVersion::V1 + ), + )) } else { self.config.name = name; let runtime = match self.runtime { @@ -221,6 +257,7 @@ struct SqlCatalogConfig { name: String, warehouse_location: String, sql_bind_style: SqlBindStyle, + schema_version: Option, props: HashMap, } @@ -233,6 +270,43 @@ pub struct SqlCatalog { fileio: FileIO, sql_bind_style: SqlBindStyle, runtime: Runtime, + schema_version: SchemaVersion, +} + +#[derive(Debug, Clone, Copy, PartialEq, strum::EnumString, strum::Display)] +#[strum(ascii_case_insensitive)] +/// Schema version of the `iceberg_tables` catalog table. +pub enum SchemaVersion { + /// Original schema without the `iceberg_type` column. + V0, + /// Extended schema with the `iceberg_type` column for view support. + V1, +} + +impl SchemaVersion { + /// The trailing SQL `AND` clause used to exclude view rows when querying for tables. + /// + /// `V1` schemas carry an `iceberg_type` column, so table rows are those tagged `TABLE` + /// (or `NULL`, for rows written before the column existed). `V0` schemas have no such + /// column, so no filter is applied. + fn record_type_filter(self) -> &'static str { + match self { + SchemaVersion::V1 => "AND (iceberg_type = 'TABLE' OR iceberg_type IS NULL)", + SchemaVersion::V0 => "", + } + } + + /// The SQL needed to migrate a `V0` catalog table up to this schema version. + /// + /// Returns `None` when the target version requires no migration (i.e. `V0`). + fn migration_sql(self) -> Option { + match self { + SchemaVersion::V1 => Some(format!( + "ALTER TABLE {CATALOG_TABLE_NAME} ADD COLUMN {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5)" + )), + SchemaVersion::V0 => None, + } + } } #[derive(Debug, PartialEq, strum::EnumString, strum::Display)] @@ -310,6 +384,46 @@ impl SqlCatalog { .await .map_err(from_sqlx_error)?; + // Probe for the `iceberg_type` column to detect whether the catalog table is already a schema version v1 (which supports views). + let is_v1 = match sqlx::query(&format!( + "SELECT {CATALOG_FIELD_RECORD_TYPE} FROM {CATALOG_TABLE_NAME} LIMIT 0" + )) + .execute(&pool) + .await + { + Ok(_) => true, + // The database rejected the query: the `iceberg_type` column (or table) is absent, + // so this is a genuine V0 schema. + Err(sqlx::Error::Database(_)) => false, + // Any other error (connection dropped, pool timeout, misconfiguration, ...) is not a + // signal about the schema version. Surface it rather than misclassifying as V0. + Err(e) => return Err(from_sqlx_error(e)), + }; + + // Migrate the schema to V1 if the catalog table does not support views and the caller + // opted in via `sql.schema-version=V1`. + let schema_version = if is_v1 { + tracing::debug!("detected {CATALOG_TABLE_NAME} schema v1 which already supports views"); + SchemaVersion::V1 + } else if config.schema_version == Some(SchemaVersion::V1) { + tracing::warn!( + "detected {CATALOG_TABLE_NAME} schema v0; migrating to v1 to enable view support" + ); + if let Some(migration_sql) = SchemaVersion::V1.migration_sql() { + sqlx::query(&migration_sql) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; + } + SchemaVersion::V1 + } else { + tracing::warn!( + "detected v0 {CATALOG_TABLE_NAME} schema; SqlCatalog is initialized without view support. To auto-migrate the database's schema and enable view support, set {}=V1", + SQL_CATALOG_PROP_SCHEMA_VERSION + ); + SchemaVersion::V0 + }; + Ok(SqlCatalog { name: config.name.to_owned(), connection: pool, @@ -317,6 +431,7 @@ impl SqlCatalog { fileio, sql_bind_style: config.sql_bind_style, runtime, + schema_version, }) } @@ -688,10 +803,8 @@ impl Catalog for SqlCatalog { FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_CATALOG_NAME} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )", + {}", + self.schema_version.record_type_filter() ), vec![Some(&namespace.join(".")), Some(&self.name)], ) @@ -727,10 +840,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.schema_version.record_type_filter() ), vec![Some(&namespace), Some(&self.name), Some(table_name)], ) @@ -754,10 +865,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.schema_version.record_type_filter() ), vec![ Some(&self.name), @@ -795,10 +904,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.schema_version.record_type_filter() ), vec![ Some(&self.name), @@ -923,10 +1030,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.schema_version.record_type_filter() ), vec![ Some(dest.name()), @@ -995,11 +1100,9 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - ) - AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?" + {} + AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?", + self.schema_version.record_type_filter() ), vec![ Some(&staged_metadata_location_str), @@ -1037,14 +1140,16 @@ mod tests { use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use itertools::Itertools; use regex::Regex; + use sqlx::any::install_default_drivers; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; use crate::catalog::{ - NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI, + NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, + SQL_CATALOG_PROP_BIND_STYLE_LEGACY, SQL_CATALOG_PROP_SCHEMA_VERSION, SQL_CATALOG_PROP_URI, SQL_CATALOG_PROP_WAREHOUSE, }; - use crate::{SqlBindStyle, SqlCatalogBuilder}; + use crate::{SchemaVersion, SqlBindStyle, SqlCatalogBuilder}; const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; @@ -2046,4 +2151,178 @@ mod tests { format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"), ); } + + /// Creates a V0 SQLite database (no `iceberg_type` column) with one pre-inserted table row. + /// Returns the SQLite URI and the temp dir that owns the database file. + async fn create_v0_sqlite_db() -> (String, TempDir) { + let temp_dir = TempDir::new().unwrap(); + let uri = format!( + "sqlite:{}", + temp_dir.path().join("catalog.db").to_str().unwrap() + ); + sqlx::Sqlite::create_database(&uri).await.unwrap(); + let pool = sqlx::AnyPool::connect(&uri).await.unwrap(); + sqlx::query( + "CREATE TABLE iceberg_tables ( + catalog_name VARCHAR(255) NOT NULL, + table_namespace VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + metadata_location VARCHAR(1000), + previous_metadata_location VARCHAR(1000), + PRIMARY KEY (catalog_name, table_namespace, table_name) + )", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "INSERT INTO iceberg_tables + (catalog_name, table_namespace, table_name, metadata_location) + VALUES ('iceberg', 'ns', 'tbl', '/tmp/fake-location')", + ) + .execute(&pool) + .await + .unwrap(); + pool.close().await; + (uri, temp_dir) + } + + #[tokio::test] + async fn test_v0_schema_migration() { + install_default_drivers(); + + let (uri, temp_dir) = create_v0_sqlite_db().await; + + // Opening the catalog with sql.schema-version=V1 should migrate the V0 schema. + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), uri), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + temp_dir.path().to_str().unwrap().to_string(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ( + SQL_CATALOG_PROP_SCHEMA_VERSION.to_string(), + SchemaVersion::V1.to_string(), + ), + ]); + let catalog = SqlCatalogBuilder::default() + .with_storage_factory(Arc::new(LocalFsStorageFactory)) + .load("iceberg", props) + .await + .expect("should open V0 catalog and migrate schema when sql.schema-version=V1"); + + // The V0 row (no "iceberg_type" column) should be treated as a TABLE after migration. + let ns = NamespaceIdent::from_strs(["ns"]).unwrap(); + let tables = catalog.list_tables(&ns).await.unwrap(); + assert_eq!(tables.len(), 1); + assert_eq!(tables[0].name(), "tbl"); + } + + #[tokio::test] + async fn test_v0_schema_no_migration_without_property() { + install_default_drivers(); + + let (uri, temp_dir) = create_v0_sqlite_db().await; + + // Opening without sql.schema-version=V1 should NOT migrate — but should still work. + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), uri.clone()), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + temp_dir.path().to_str().unwrap().to_string(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ]); + let catalog = SqlCatalogBuilder::default() + .with_storage_factory(Arc::new(LocalFsStorageFactory)) + .load("iceberg", props) + .await + .expect("should open V0 catalog without migrating"); + + assert_eq!(catalog.schema_version, SchemaVersion::V0); + + // The table should still be visible via V0 queries (no iceberg_type filter). + let ns = NamespaceIdent::from_strs(["ns"]).unwrap(); + let tables = catalog.list_tables(&ns).await.unwrap(); + assert_eq!(tables.len(), 1); + assert_eq!(tables[0].name(), "tbl"); + + // Confirm the column was NOT added to the database. + let probe_pool = sqlx::AnyPool::connect(&uri).await.unwrap(); + let column_exists = sqlx::query("SELECT iceberg_type FROM iceberg_tables LIMIT 0") + .execute(&probe_pool) + .await + .is_ok(); + probe_pool.close().await; + assert!( + !column_exists, + "iceberg_type column should not exist when sql.schema-version=V1 was not set" + ); + } + + #[tokio::test] + async fn test_invalid_schema_version_is_rejected() { + install_default_drivers(); + + let (uri, temp_dir) = create_v0_sqlite_db().await; + + // An unrecognized sql.schema-version value must fail fast rather than silently + // falling back to V0. + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), uri), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + temp_dir.path().to_str().unwrap().to_string(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ( + SQL_CATALOG_PROP_SCHEMA_VERSION.to_string(), + "v2".to_string(), + ), + ]); + let result = SqlCatalogBuilder::default() + .with_storage_factory(Arc::new(LocalFsStorageFactory)) + .load("iceberg", props) + .await; + + let err = result.expect_err("an invalid sql.schema-version should be rejected"); + assert_eq!(err.kind(), iceberg::ErrorKind::DataInvalid); + } + + #[tokio::test] + async fn test_legacy_bind_style_key_is_accepted() { + install_default_drivers(); + + let (uri, temp_dir) = create_v0_sqlite_db().await; + + // The legacy `sql_bind_style` key must keep working alongside the new `sql.bind-style`. + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), uri), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + temp_dir.path().to_str().unwrap().to_string(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE_LEGACY.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ]); + let catalog = SqlCatalogBuilder::default() + .with_storage_factory(Arc::new(LocalFsStorageFactory)) + .load("iceberg", props) + .await + .expect("legacy sql_bind_style key should still be accepted"); + + assert_eq!(catalog.sql_bind_style, SqlBindStyle::QMark); + } }