diff --git a/crates/integrations/datafusion/src/sql_handler.rs b/crates/integrations/datafusion/src/sql_handler.rs index 78f64bb3..b32b63ff 100644 --- a/crates/integrations/datafusion/src/sql_handler.rs +++ b/crates/integrations/datafusion/src/sql_handler.rs @@ -44,10 +44,14 @@ use datafusion::sql::sqlparser::ast::{ use datafusion::sql::sqlparser::dialect::GenericDialect; use datafusion::sql::sqlparser::parser::Parser; use paimon::catalog::{Catalog, Identifier}; -use paimon::spec::SchemaChange; +use paimon::spec::{ + ArrayType as PaimonArrayType, BigIntType, BlobType, BooleanType, DataField as PaimonDataField, + DataType as PaimonDataType, DateType, DecimalType, DoubleType, FloatType, IntType, + LocalZonedTimestampType, MapType as PaimonMapType, RowType as PaimonRowType, SchemaChange, + SmallIntType, TimestampType, TinyIntType, VarBinaryType, VarCharType, +}; use crate::error::to_datafusion_error; -use paimon::arrow::arrow_to_paimon_type; /// Wraps a [`SessionContext`] and a Paimon [`Catalog`] to handle DDL statements /// that DataFusion does not natively support (e.g. ALTER TABLE). @@ -136,15 +140,7 @@ impl PaimonSqlHandler { // Columns for col in &ct.columns { - let arrow_type = sql_data_type_to_arrow(&col.data_type)?; - let nullable = !col.options.iter().any(|opt| { - matches!( - opt.option, - datafusion::sql::sqlparser::ast::ColumnOption::NotNull - ) - }); - let paimon_type = - arrow_to_paimon_type(&arrow_type, nullable).map_err(to_datafusion_error)?; + let paimon_type = column_def_to_paimon_type(col)?; builder = builder.column(col.name.value.clone(), paimon_type); } @@ -324,110 +320,149 @@ impl PaimonSqlHandler { /// Convert a sqlparser [`ColumnDef`] to a Paimon [`SchemaChange::AddColumn`]. fn column_def_to_add_column(col: &ColumnDef) -> DFResult { - let arrow_type = sql_data_type_to_arrow(&col.data_type)?; - let nullable = !col.options.iter().any(|opt| { - matches!( - opt.option, - datafusion::sql::sqlparser::ast::ColumnOption::NotNull - ) - }); - let paimon_type = arrow_to_paimon_type(&arrow_type, nullable).map_err(to_datafusion_error)?; + let paimon_type = column_def_to_paimon_type(col)?; Ok(SchemaChange::add_column( col.name.value.clone(), paimon_type, )) } -/// Convert a sqlparser SQL data type to an Arrow data type. -fn sql_data_type_to_arrow( +fn column_def_to_paimon_type(col: &ColumnDef) -> DFResult { + sql_data_type_to_paimon_type(&col.data_type, column_def_nullable(col)) +} + +fn column_def_nullable(col: &ColumnDef) -> bool { + !col.options.iter().any(|opt| { + matches!( + opt.option, + datafusion::sql::sqlparser::ast::ColumnOption::NotNull + ) + }) +} + +/// Convert a sqlparser SQL data type to a Paimon data type. +/// +/// DDL schema translation must use this function instead of going through Arrow, +/// because Arrow cannot preserve logical distinctions such as `BLOB` vs `VARBINARY`. +fn sql_data_type_to_paimon_type( sql_type: &datafusion::sql::sqlparser::ast::DataType, -) -> DFResult { - use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as SqlType}; + nullable: bool, +) -> DFResult { + use datafusion::sql::sqlparser::ast::{ + ArrayElemTypeDef, DataType as SqlType, ExactNumberInfo, TimezoneInfo, + }; + match sql_type { - SqlType::Boolean => Ok(ArrowDataType::Boolean), - SqlType::TinyInt(_) => Ok(ArrowDataType::Int8), - SqlType::SmallInt(_) => Ok(ArrowDataType::Int16), - SqlType::Int(_) | SqlType::Integer(_) => Ok(ArrowDataType::Int32), - SqlType::BigInt(_) => Ok(ArrowDataType::Int64), - SqlType::Float(_) => Ok(ArrowDataType::Float32), - SqlType::Real => Ok(ArrowDataType::Float32), - SqlType::Double(_) | SqlType::DoublePrecision => Ok(ArrowDataType::Float64), - SqlType::Varchar(_) | SqlType::CharVarying(_) | SqlType::Text | SqlType::String(_) => { - Ok(ArrowDataType::Utf8) + SqlType::Boolean => Ok(PaimonDataType::Boolean(BooleanType::with_nullable( + nullable, + ))), + SqlType::TinyInt(_) => Ok(PaimonDataType::TinyInt(TinyIntType::with_nullable( + nullable, + ))), + SqlType::SmallInt(_) => Ok(PaimonDataType::SmallInt(SmallIntType::with_nullable( + nullable, + ))), + SqlType::Int(_) | SqlType::Integer(_) => { + Ok(PaimonDataType::Int(IntType::with_nullable(nullable))) + } + SqlType::BigInt(_) => Ok(PaimonDataType::BigInt(BigIntType::with_nullable(nullable))), + SqlType::Float(_) | SqlType::Real => { + Ok(PaimonDataType::Float(FloatType::with_nullable(nullable))) + } + SqlType::Double(_) | SqlType::DoublePrecision => { + Ok(PaimonDataType::Double(DoubleType::with_nullable(nullable))) } - SqlType::Char(_) | SqlType::Character(_) => Ok(ArrowDataType::Utf8), - SqlType::Binary(_) | SqlType::Varbinary(_) | SqlType::Blob(_) | SqlType::Bytea => { - Ok(ArrowDataType::Binary) + SqlType::Varchar(_) + | SqlType::CharVarying(_) + | SqlType::Text + | SqlType::String(_) + | SqlType::Char(_) + | SqlType::Character(_) => Ok(PaimonDataType::VarChar( + VarCharType::with_nullable(nullable, VarCharType::MAX_LENGTH) + .map_err(to_datafusion_error)?, + )), + SqlType::Binary(_) | SqlType::Varbinary(_) | SqlType::Bytea => { + Ok(PaimonDataType::VarBinary( + VarBinaryType::try_new(nullable, VarBinaryType::MAX_LENGTH) + .map_err(to_datafusion_error)?, + )) } - SqlType::Date => Ok(ArrowDataType::Date32), + SqlType::Blob(_) => Ok(PaimonDataType::Blob(BlobType::with_nullable(nullable))), + SqlType::Date => Ok(PaimonDataType::Date(DateType::with_nullable(nullable))), SqlType::Timestamp(precision, tz_info) => { - use datafusion::sql::sqlparser::ast::TimezoneInfo; - let unit = match precision { - Some(0) => datafusion::arrow::datatypes::TimeUnit::Second, - Some(1..=3) | None => datafusion::arrow::datatypes::TimeUnit::Millisecond, - Some(4..=6) => datafusion::arrow::datatypes::TimeUnit::Microsecond, - _ => datafusion::arrow::datatypes::TimeUnit::Nanosecond, + let precision = match precision { + Some(0) => 0, + Some(1..=3) | None => 3, + Some(4..=6) => 6, + _ => 9, }; - let tz = match tz_info { - TimezoneInfo::None | TimezoneInfo::WithoutTimeZone => None, - _ => Some("UTC".into()), - }; - Ok(ArrowDataType::Timestamp(unit, tz)) + match tz_info { + TimezoneInfo::None | TimezoneInfo::WithoutTimeZone => { + Ok(PaimonDataType::Timestamp( + TimestampType::with_nullable(nullable, precision) + .map_err(to_datafusion_error)?, + )) + } + _ => Ok(PaimonDataType::LocalZonedTimestamp( + LocalZonedTimestampType::with_nullable(nullable, precision) + .map_err(to_datafusion_error)?, + )), + } } SqlType::Decimal(info) => { - use datafusion::sql::sqlparser::ast::ExactNumberInfo; - let (p, s) = match info { - ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8), - ExactNumberInfo::Precision(p) => (*p as u8, 0), + let (precision, scale) = match info { + ExactNumberInfo::PrecisionAndScale(precision, scale) => { + (*precision as u32, *scale as u32) + } + ExactNumberInfo::Precision(precision) => (*precision as u32, 0), ExactNumberInfo::None => (10, 0), }; - Ok(ArrowDataType::Decimal128(p, s)) + Ok(PaimonDataType::Decimal( + DecimalType::with_nullable(nullable, precision, scale) + .map_err(to_datafusion_error)?, + )) } SqlType::Array(elem_def) => { - let elem_type = match elem_def { + let element_type = match elem_def { ArrayElemTypeDef::AngleBracket(t) | ArrayElemTypeDef::SquareBracket(t, _) - | ArrayElemTypeDef::Parenthesis(t) => sql_data_type_to_arrow(t)?, + | ArrayElemTypeDef::Parenthesis(t) => sql_data_type_to_paimon_type(t, true)?, ArrayElemTypeDef::None => { return Err(DataFusionError::Plan( "ARRAY type requires an element type".to_string(), )); } }; - Ok(ArrowDataType::List(Arc::new(Field::new( - "element", elem_type, true, - )))) + Ok(PaimonDataType::Array(PaimonArrayType::with_nullable( + nullable, + element_type, + ))) } SqlType::Map(key_type, value_type) => { - let key = sql_data_type_to_arrow(key_type)?; - let value = sql_data_type_to_arrow(value_type)?; - let entries = Field::new( - "entries", - ArrowDataType::Struct( - vec![ - Field::new("key", key, false), - Field::new("value", value, true), - ] - .into(), - ), - false, - ); - Ok(ArrowDataType::Map(Arc::new(entries), false)) + let key = sql_data_type_to_paimon_type(key_type, false)?; + let value = sql_data_type_to_paimon_type(value_type, true)?; + Ok(PaimonDataType::Map(PaimonMapType::with_nullable( + nullable, key, value, + ))) } SqlType::Struct(fields, _) => { - let arrow_fields: Vec = fields + let paimon_fields = fields .iter() - .map(|f| { - let name = f + .enumerate() + .map(|(idx, field)| { + let name = field .field_name .as_ref() .map(|n| n.value.clone()) .unwrap_or_default(); - let dt = sql_data_type_to_arrow(&f.field_type)?; - Ok(Field::new(name, dt, true)) + let data_type = sql_data_type_to_paimon_type(&field.field_type, true)?; + Ok(PaimonDataField::new(idx as i32, name, data_type)) }) - .collect::>()?; - Ok(ArrowDataType::Struct(arrow_fields.into())) + .collect::>>()?; + Ok(PaimonDataType::Row(PaimonRowType::with_nullable( + nullable, + paimon_fields, + ))) } _ => Err(DataFusionError::Plan(format!( "Unsupported SQL data type: {sql_type}" @@ -494,9 +529,8 @@ mod tests { use std::sync::Mutex; use async_trait::async_trait; - use datafusion::arrow::datatypes::TimeUnit; use paimon::catalog::Database; - use paimon::spec::Schema as PaimonSchema; + use paimon::spec::{DataType as PaimonDataType, Schema as PaimonSchema}; use paimon::table::Table; // ==================== Mock Catalog ==================== @@ -619,56 +653,57 @@ mod tests { PaimonSqlHandler::new(SessionContext::new(), catalog, "paimon") } - // ==================== sql_data_type_to_arrow tests ==================== + fn assert_sql_type_to_paimon( + sql_type: datafusion::sql::sqlparser::ast::DataType, + expected: PaimonDataType, + ) { + assert_eq!( + sql_data_type_to_paimon_type(&sql_type, true).unwrap(), + expected + ); + } + + // ==================== sql_data_type_to_paimon_type tests ==================== #[test] fn test_sql_type_boolean() { use datafusion::sql::sqlparser::ast::DataType as SqlType; - assert_eq!( - sql_data_type_to_arrow(&SqlType::Boolean).unwrap(), - ArrowDataType::Boolean + assert_sql_type_to_paimon( + SqlType::Boolean, + PaimonDataType::Boolean(BooleanType::new()), ); } #[test] fn test_sql_type_integers() { use datafusion::sql::sqlparser::ast::DataType as SqlType; - assert_eq!( - sql_data_type_to_arrow(&SqlType::TinyInt(None)).unwrap(), - ArrowDataType::Int8 - ); - assert_eq!( - sql_data_type_to_arrow(&SqlType::SmallInt(None)).unwrap(), - ArrowDataType::Int16 - ); - assert_eq!( - sql_data_type_to_arrow(&SqlType::Int(None)).unwrap(), - ArrowDataType::Int32 + assert_sql_type_to_paimon( + SqlType::TinyInt(None), + PaimonDataType::TinyInt(TinyIntType::new()), ); - assert_eq!( - sql_data_type_to_arrow(&SqlType::Integer(None)).unwrap(), - ArrowDataType::Int32 + assert_sql_type_to_paimon( + SqlType::SmallInt(None), + PaimonDataType::SmallInt(SmallIntType::new()), ); - assert_eq!( - sql_data_type_to_arrow(&SqlType::BigInt(None)).unwrap(), - ArrowDataType::Int64 + assert_sql_type_to_paimon(SqlType::Int(None), PaimonDataType::Int(IntType::new())); + assert_sql_type_to_paimon(SqlType::Integer(None), PaimonDataType::Int(IntType::new())); + assert_sql_type_to_paimon( + SqlType::BigInt(None), + PaimonDataType::BigInt(BigIntType::new()), ); } #[test] fn test_sql_type_floats() { use datafusion::sql::sqlparser::ast::{DataType as SqlType, ExactNumberInfo}; - assert_eq!( - sql_data_type_to_arrow(&SqlType::Float(ExactNumberInfo::None)).unwrap(), - ArrowDataType::Float32 + assert_sql_type_to_paimon( + SqlType::Float(ExactNumberInfo::None), + PaimonDataType::Float(FloatType::new()), ); - assert_eq!( - sql_data_type_to_arrow(&SqlType::Real).unwrap(), - ArrowDataType::Float32 - ); - assert_eq!( - sql_data_type_to_arrow(&SqlType::DoublePrecision).unwrap(), - ArrowDataType::Float64 + assert_sql_type_to_paimon(SqlType::Real, PaimonDataType::Float(FloatType::new())); + assert_sql_type_to_paimon( + SqlType::DoublePrecision, + PaimonDataType::Double(DoubleType::new()), ); } @@ -676,10 +711,11 @@ mod tests { fn test_sql_type_string_variants() { use datafusion::sql::sqlparser::ast::DataType as SqlType; for sql_type in [SqlType::Varchar(None), SqlType::Text, SqlType::String(None)] { - assert_eq!( - sql_data_type_to_arrow(&sql_type).unwrap(), - ArrowDataType::Utf8, - "failed for {sql_type:?}" + assert_sql_type_to_paimon( + sql_type.clone(), + PaimonDataType::VarChar( + VarCharType::with_nullable(true, VarCharType::MAX_LENGTH).unwrap(), + ), ); } } @@ -687,133 +723,120 @@ mod tests { #[test] fn test_sql_type_binary() { use datafusion::sql::sqlparser::ast::DataType as SqlType; - assert_eq!( - sql_data_type_to_arrow(&SqlType::Bytea).unwrap(), - ArrowDataType::Binary + assert_sql_type_to_paimon( + SqlType::Bytea, + PaimonDataType::VarBinary( + VarBinaryType::try_new(true, VarBinaryType::MAX_LENGTH).unwrap(), + ), ); } #[test] fn test_sql_type_date() { use datafusion::sql::sqlparser::ast::DataType as SqlType; - assert_eq!( - sql_data_type_to_arrow(&SqlType::Date).unwrap(), - ArrowDataType::Date32 - ); + assert_sql_type_to_paimon(SqlType::Date, PaimonDataType::Date(DateType::new())); } #[test] fn test_sql_type_timestamp_default() { use datafusion::sql::sqlparser::ast::{DataType as SqlType, TimezoneInfo}; - let result = sql_data_type_to_arrow(&SqlType::Timestamp(None, TimezoneInfo::None)).unwrap(); - assert_eq!( - result, - ArrowDataType::Timestamp(TimeUnit::Millisecond, None) + assert_sql_type_to_paimon( + SqlType::Timestamp(None, TimezoneInfo::None), + PaimonDataType::Timestamp(TimestampType::with_nullable(true, 3).unwrap()), ); } #[test] fn test_sql_type_timestamp_with_precision() { use datafusion::sql::sqlparser::ast::{DataType as SqlType, TimezoneInfo}; - // precision 0 => Second - assert_eq!( - sql_data_type_to_arrow(&SqlType::Timestamp(Some(0), TimezoneInfo::None)).unwrap(), - ArrowDataType::Timestamp(TimeUnit::Second, None) + assert_sql_type_to_paimon( + SqlType::Timestamp(Some(0), TimezoneInfo::None), + PaimonDataType::Timestamp(TimestampType::with_nullable(true, 0).unwrap()), ); - // precision 3 => Millisecond - assert_eq!( - sql_data_type_to_arrow(&SqlType::Timestamp(Some(3), TimezoneInfo::None)).unwrap(), - ArrowDataType::Timestamp(TimeUnit::Millisecond, None) + assert_sql_type_to_paimon( + SqlType::Timestamp(Some(3), TimezoneInfo::None), + PaimonDataType::Timestamp(TimestampType::with_nullable(true, 3).unwrap()), ); - // precision 6 => Microsecond - assert_eq!( - sql_data_type_to_arrow(&SqlType::Timestamp(Some(6), TimezoneInfo::None)).unwrap(), - ArrowDataType::Timestamp(TimeUnit::Microsecond, None) + assert_sql_type_to_paimon( + SqlType::Timestamp(Some(6), TimezoneInfo::None), + PaimonDataType::Timestamp(TimestampType::with_nullable(true, 6).unwrap()), ); - // precision 9 => Nanosecond - assert_eq!( - sql_data_type_to_arrow(&SqlType::Timestamp(Some(9), TimezoneInfo::None)).unwrap(), - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None) + assert_sql_type_to_paimon( + SqlType::Timestamp(Some(9), TimezoneInfo::None), + PaimonDataType::Timestamp(TimestampType::with_nullable(true, 9).unwrap()), ); } #[test] fn test_sql_type_timestamp_with_tz() { use datafusion::sql::sqlparser::ast::{DataType as SqlType, TimezoneInfo}; - let result = - sql_data_type_to_arrow(&SqlType::Timestamp(None, TimezoneInfo::WithTimeZone)).unwrap(); - assert_eq!( - result, - ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())) + assert_sql_type_to_paimon( + SqlType::Timestamp(None, TimezoneInfo::WithTimeZone), + PaimonDataType::LocalZonedTimestamp( + LocalZonedTimestampType::with_nullable(true, 3).unwrap(), + ), ); } #[test] fn test_sql_type_decimal() { use datafusion::sql::sqlparser::ast::{DataType as SqlType, ExactNumberInfo}; - assert_eq!( - sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::PrecisionAndScale(18, 2))) - .unwrap(), - ArrowDataType::Decimal128(18, 2) + assert_sql_type_to_paimon( + SqlType::Decimal(ExactNumberInfo::PrecisionAndScale(18, 2)), + PaimonDataType::Decimal(DecimalType::with_nullable(true, 18, 2).unwrap()), ); - assert_eq!( - sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::Precision(10))).unwrap(), - ArrowDataType::Decimal128(10, 0) + assert_sql_type_to_paimon( + SqlType::Decimal(ExactNumberInfo::Precision(10)), + PaimonDataType::Decimal(DecimalType::with_nullable(true, 10, 0).unwrap()), ); - assert_eq!( - sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::None)).unwrap(), - ArrowDataType::Decimal128(10, 0) + assert_sql_type_to_paimon( + SqlType::Decimal(ExactNumberInfo::None), + PaimonDataType::Decimal(DecimalType::with_nullable(true, 10, 0).unwrap()), ); } #[test] fn test_sql_type_unsupported() { use datafusion::sql::sqlparser::ast::DataType as SqlType; - assert!(sql_data_type_to_arrow(&SqlType::Regclass).is_err()); + assert!(sql_data_type_to_paimon_type(&SqlType::Regclass, true).is_err()); } #[test] fn test_sql_type_array() { use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as SqlType}; - let result = sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::AngleBracket( - Box::new(SqlType::Int(None)), - ))) - .unwrap(); - assert_eq!( - result, - ArrowDataType::List(Arc::new(Field::new("element", ArrowDataType::Int32, true))) + assert_sql_type_to_paimon( + SqlType::Array(ArrayElemTypeDef::AngleBracket(Box::new(SqlType::Int(None)))), + PaimonDataType::Array(PaimonArrayType::with_nullable( + true, + PaimonDataType::Int(IntType::new()), + )), ); } #[test] fn test_sql_type_array_no_element() { use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as SqlType}; - assert!(sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::None)).is_err()); + assert!( + sql_data_type_to_paimon_type(&SqlType::Array(ArrayElemTypeDef::None), true).is_err() + ); } #[test] fn test_sql_type_map() { use datafusion::sql::sqlparser::ast::DataType as SqlType; - let result = sql_data_type_to_arrow(&SqlType::Map( - Box::new(SqlType::Varchar(None)), - Box::new(SqlType::Int(None)), - )) - .unwrap(); - let expected = ArrowDataType::Map( - Arc::new(Field::new( - "entries", - ArrowDataType::Struct( - vec![ - Field::new("key", ArrowDataType::Utf8, false), - Field::new("value", ArrowDataType::Int32, true), - ] - .into(), + assert_sql_type_to_paimon( + SqlType::Map( + Box::new(SqlType::Varchar(None)), + Box::new(SqlType::Int(None)), + ), + PaimonDataType::Map(PaimonMapType::with_nullable( + true, + PaimonDataType::VarChar( + VarCharType::with_nullable(false, VarCharType::MAX_LENGTH).unwrap(), ), - false, + PaimonDataType::Int(IntType::new()), )), - false, ); - assert_eq!(result, expected); } #[test] @@ -821,31 +844,35 @@ mod tests { use datafusion::sql::sqlparser::ast::{ DataType as SqlType, Ident, StructBracketKind, StructField, }; - let result = sql_data_type_to_arrow(&SqlType::Struct( - vec![ - StructField { - field_name: Some(Ident::new("name")), - field_type: SqlType::Varchar(None), - options: None, - }, - StructField { - field_name: Some(Ident::new("age")), - field_type: SqlType::Int(None), - options: None, - }, - ], - StructBracketKind::AngleBrackets, - )) - .unwrap(); - assert_eq!( - result, - ArrowDataType::Struct( + assert_sql_type_to_paimon( + SqlType::Struct( vec![ - Field::new("name", ArrowDataType::Utf8, true), - Field::new("age", ArrowDataType::Int32, true), - ] - .into() - ) + StructField { + field_name: Some(Ident::new("name")), + field_type: SqlType::Varchar(None), + options: None, + }, + StructField { + field_name: Some(Ident::new("age")), + field_type: SqlType::Int(None), + options: None, + }, + ], + StructBracketKind::AngleBrackets, + ), + PaimonDataType::Row(PaimonRowType::with_nullable( + true, + vec![ + PaimonDataField::new( + 0, + "name".to_string(), + PaimonDataType::VarChar( + VarCharType::with_nullable(true, VarCharType::MAX_LENGTH).unwrap(), + ), + ), + PaimonDataField::new(1, "age".to_string(), PaimonDataType::Int(IntType::new())), + ], + )), ); } @@ -1040,6 +1067,30 @@ mod tests { } } + #[tokio::test] + async fn test_create_table_blob_type_preserved() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("CREATE TABLE mydb.t1 (payload BLOB NOT NULL)") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::CreateTable { schema, .. } = &calls[0] { + assert_eq!(schema.fields().len(), 1); + assert!(matches!( + schema.fields()[0].data_type(), + PaimonDataType::Blob(_) + )); + assert!(!schema.fields()[0].data_type().is_nullable()); + } else { + panic!("expected CreateTable call"); + } + } + #[tokio::test] async fn test_alter_table_add_column() { let catalog = Arc::new(MockCatalog::new()); @@ -1069,6 +1120,33 @@ mod tests { } } + #[tokio::test] + async fn test_alter_table_add_blob_column() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("ALTER TABLE mydb.t1 ADD COLUMN payload BLOB") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::AlterTable { changes, .. } = &calls[0] { + assert_eq!(changes.len(), 1); + assert!(matches!( + &changes[0], + SchemaChange::AddColumn { + field_name, + data_type, + .. + } if field_name == "payload" && matches!(data_type, PaimonDataType::Blob(_)) + )); + } else { + panic!("expected AlterTable call"); + } + } + #[tokio::test] async fn test_alter_table_drop_column() { let catalog = Arc::new(MockCatalog::new()); diff --git a/crates/integrations/datafusion/tests/sql_handler_tests.rs b/crates/integrations/datafusion/tests/sql_handler_tests.rs index c3608b31..5e9105a7 100644 --- a/crates/integrations/datafusion/tests/sql_handler_tests.rs +++ b/crates/integrations/datafusion/tests/sql_handler_tests.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use datafusion::catalog::CatalogProvider; use datafusion::prelude::SessionContext; use paimon::catalog::Identifier; -use paimon::spec::{ArrayType, DataType, IntType, MapType, VarCharType}; +use paimon::spec::{ArrayType, BlobType, DataType, IntType, MapType, VarCharType}; use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner, PaimonSqlHandler}; use tempfile::TempDir; @@ -147,6 +147,40 @@ async fn test_create_table() { assert_eq!(schema.primary_keys(), &["id"]); } +#[tokio::test] +async fn test_create_table_with_blob_type() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + handler + .sql( + "CREATE TABLE paimon.mydb.assets ( + id INT NOT NULL, + payload BLOB, + PRIMARY KEY (id) + )", + ) + .await + .expect("CREATE TABLE with BLOB should succeed"); + + let table = catalog + .get_table(&Identifier::new("mydb", "assets")) + .await + .unwrap(); + let schema = table.schema(); + assert_eq!(schema.fields().len(), 2); + assert_eq!(schema.primary_keys(), &["id"]); + assert_eq!( + *schema.fields()[1].data_type(), + DataType::Blob(BlobType::new()) + ); +} + #[tokio::test] async fn test_create_table_with_partition() { let (_tmp, catalog) = create_test_env(); diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 74de8a28..0ff0a2c2 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -755,6 +755,7 @@ fn literal_scalar_for_parquet_filter( DataType::Time(_) | DataType::Timestamp(_) | DataType::LocalZonedTimestamp(_) + | DataType::Blob(_) | DataType::Array(_) | DataType::Map(_) | DataType::Multiset(_) diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs index 37f907bb..50a3a68b 100644 --- a/crates/paimon/src/arrow/mod.rs +++ b/crates/paimon/src/arrow/mod.rs @@ -39,7 +39,9 @@ pub fn paimon_type_to_arrow(dt: &PaimonDataType) -> crate::Result PaimonDataType::Float(_) => ArrowDataType::Float32, PaimonDataType::Double(_) => ArrowDataType::Float64, PaimonDataType::VarChar(_) | PaimonDataType::Char(_) => ArrowDataType::Utf8, - PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) => ArrowDataType::Binary, + PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) | PaimonDataType::Blob(_) => { + ArrowDataType::Binary + } PaimonDataType::Date(_) => ArrowDataType::Date32, PaimonDataType::Time(_) => ArrowDataType::Time32(TimeUnit::Millisecond), PaimonDataType::Timestamp(t) => { @@ -341,6 +343,17 @@ mod tests { } } + #[test] + fn test_blob_type_maps_one_way_to_arrow_binary() { + let blob = PaimonDataType::Blob(BlobType::new()); + let varbinary = PaimonDataType::VarBinary( + VarBinaryType::try_new(true, VarBinaryType::MAX_LENGTH).unwrap(), + ); + + assert_paimon_to_arrow(&blob, &ArrowDataType::Binary); + assert_arrow_to_paimon(&ArrowDataType::Binary, true, &varbinary); + } + #[test] fn test_timestamp_roundtrip() { // millisecond precision diff --git a/crates/paimon/src/spec/partition_utils.rs b/crates/paimon/src/spec/partition_utils.rs index 5d05863c..427ea35f 100644 --- a/crates/paimon/src/spec/partition_utils.rs +++ b/crates/paimon/src/spec/partition_utils.rs @@ -265,6 +265,7 @@ fn format_partition_value( | DataType::Double(_) | DataType::Binary(_) | DataType::VarBinary(_) + | DataType::Blob(_) | DataType::Array(_) | DataType::Map(_) | DataType::Multiset(_) diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs index d86c068c..7e0f78c9 100644 --- a/crates/paimon/src/spec/types.rs +++ b/crates/paimon/src/spec/types.rs @@ -76,6 +76,8 @@ pub enum DataType { Binary(BinaryType), /// Data type of a variable-length binary string (=a sequence of bytes). VarBinary(VarBinaryType), + /// Data type of binary large object. + Blob(BlobType), /// Data type of a fixed-length character string. Char(CharType), /// Data type of a variable-length character string. @@ -117,6 +119,20 @@ impl DataType { } } + /// Returns whether this type is or contains (recursively) a [`BlobType`]. + pub fn contains_blob_type(&self) -> bool { + match self { + DataType::Blob(_) => true, + DataType::Array(v) => v.element_type.contains_blob_type(), + DataType::Map(v) => { + v.key_type.contains_blob_type() || v.value_type.contains_blob_type() + } + DataType::Multiset(v) => v.element_type.contains_blob_type(), + DataType::Row(v) => v.fields.iter().any(|f| f.data_type().contains_blob_type()), + _ => false, + } + } + /// Returns whether this type is nullable. pub fn is_nullable(&self) -> bool { match self { @@ -130,6 +146,7 @@ impl DataType { DataType::Float(v) => v.nullable, DataType::Binary(v) => v.nullable, DataType::VarBinary(v) => v.nullable, + DataType::Blob(v) => v.nullable, DataType::Char(v) => v.nullable, DataType::VarChar(v) => v.nullable, DataType::Date(v) => v.nullable, @@ -165,6 +182,7 @@ impl DataType { DataType::VarBinary(v) => { DataType::VarBinary(VarBinaryType::try_new(nullable, v.length())?) } + DataType::Blob(_) => DataType::Blob(BlobType::with_nullable(nullable)), DataType::Char(v) => DataType::Char(CharType::with_nullable(nullable, v.length())?), DataType::VarChar(v) => { DataType::VarChar(VarCharType::with_nullable(nullable, v.length())?) @@ -386,6 +404,39 @@ impl BooleanType { } } +/// BlobType for paimon. +/// +/// Data type of binary large object. +/// +/// Impl Reference: . +#[serde_as] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct BlobType { + #[serde_as(as = "FromInto>")] + nullable: bool, +} + +impl Default for BlobType { + fn default() -> Self { + Self::new() + } +} + +impl BlobType { + pub fn new() -> Self { + Self::with_nullable(true) + } + + pub fn with_nullable(nullable: bool) -> Self { + Self { nullable } + } + + pub fn family(&self) -> DataTypeFamily { + DataTypeFamily::PREDEFINED + } +} + /// CharType for paimon. /// /// Data type of a fixed-length character string. @@ -1470,6 +1521,11 @@ mod serde_utils { const NAME: &'static str = "BOOLEAN"; } + pub struct BLOB; + impl DataTypeName for BLOB { + const NAME: &'static str = "BLOB"; + } + pub struct BINARY; impl DataTypeName for BINARY { const NAME: &'static str = "BINARY"; @@ -1655,7 +1711,10 @@ mod tests { let content = std::fs::read(&path) .unwrap_or_else(|err| panic!("fixtures {path:?} load failed: {err}")); - String::from_utf8(content).expect("fixtures content must be valid utf8") + String::from_utf8(content) + .expect("fixtures content must be valid utf8") + .trim_end_matches(['\n', '\r']) + .to_string() } fn test_cases() -> Vec<(&'static str, DataType)> { @@ -1696,6 +1755,11 @@ mod tests { length: 22, }), ), + ("blob_type", DataType::Blob(BlobType { nullable: false })), + ( + "blob_type_nullable", + DataType::Blob(BlobType { nullable: true }), + ), ( "boolean_type", DataType::Boolean(BooleanType { nullable: false }), diff --git a/crates/paimon/src/table/data_evolution_writer.rs b/crates/paimon/src/table/data_evolution_writer.rs index 9e2ffbb2..9a7d07dd 100644 --- a/crates/paimon/src/table/data_evolution_writer.rs +++ b/crates/paimon/src/table/data_evolution_writer.rs @@ -61,6 +61,14 @@ pub struct DataEvolutionWriter { matched_batches: Vec, } +fn schema_contains_blob_type(table: &Table) -> bool { + table + .schema() + .fields() + .iter() + .any(|field| field.data_type().contains_blob_type()) +} + impl DataEvolutionWriter { /// Create a new writer for the given table and update columns. /// @@ -73,6 +81,14 @@ impl DataEvolutionWriter { let schema = table.schema(); let core_options = CoreOptions::new(schema.options()); + if schema_contains_blob_type(table) { + return Err(crate::Error::Unsupported { + message: + "MERGE INTO does not support BlobType yet; blob write path is out of scope" + .to_string(), + }); + } + if !core_options.data_evolution_enabled() { return Err(crate::Error::Unsupported { message: @@ -470,6 +486,12 @@ impl DataEvolutionPartialWriter { let schema = table.schema(); let core_options = CoreOptions::new(schema.options()); + if schema_contains_blob_type(table) { + return Err(crate::Error::Unsupported { + message: "DataEvolutionPartialWriter does not support BlobType yet".to_string(), + }); + } + if !core_options.data_evolution_enabled() { return Err(crate::Error::Unsupported { message: "DataEvolutionPartialWriter requires data-evolution.enabled = true" @@ -586,7 +608,9 @@ mod tests { use super::*; use crate::catalog::Identifier; use crate::io::FileIOBuilder; - use crate::spec::{DataType, IntType, Schema, TableSchema, VarCharType}; + use crate::spec::{ + BlobType, DataField, DataType, IntType, RowType, Schema, TableSchema, VarCharType, + }; use arrow_array::StringArray; use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; use std::sync::Arc; @@ -640,6 +664,24 @@ mod tests { TableSchema::new(0, &schema) } + fn test_blob_data_evolution_schema() -> TableSchema { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column( + "payload", + DataType::Row(RowType::new(vec![DataField::new( + 1, + "blob".into(), + DataType::Blob(BlobType::new()), + )])), + ) + .option("data-evolution.enabled", "true") + .option("row-tracking.enabled", "true") + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + fn test_table(file_io: &FileIO, table_path: &str) -> Table { Table::new( file_io.clone(), @@ -650,6 +692,16 @@ mod tests { ) } + fn test_blob_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_de_blob_table"), + table_path.to_string(), + test_blob_data_evolution_schema(), + None, + ) + } + async fn setup_dirs(file_io: &FileIO, table_path: &str) { file_io .mkdirs(&format!("{table_path}/snapshot/")) @@ -859,4 +911,30 @@ mod tests { let result = DataEvolutionPartialWriter::new(&table, vec!["id".to_string()]); assert!(result.is_err()); } + + #[test] + fn test_rejects_blob_data_evolution_writer() { + let file_io = test_file_io(); + let table = test_blob_table(&file_io, "memory:/test_blob_de_writer"); + + let err = DataEvolutionWriter::new(&table, vec!["id".to_string()]) + .err() + .unwrap(); + assert!( + matches!(err, crate::Error::Unsupported { message } if message.contains("BlobType")) + ); + } + + #[test] + fn test_rejects_blob_partial_writer() { + let file_io = test_file_io(); + let table = test_blob_table(&file_io, "memory:/test_blob_partial_writer"); + + let err = DataEvolutionPartialWriter::new(&table, vec!["id".to_string()]) + .err() + .unwrap(); + assert!( + matches!(err, crate::Error::Unsupported { message } if message.contains("BlobType")) + ); + } } diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index be0bcb54..3d3cbf28 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -35,6 +35,12 @@ use std::sync::Arc; type PartitionBucketKey = (Vec, i32); +fn schema_contains_blob_type(fields: &[DataField]) -> bool { + fields + .iter() + .any(|field| field.data_type().contains_blob_type()) +} + /// TableWrite writes Arrow RecordBatches to Paimon data files. /// /// Each (partition, bucket) pair gets its own `DataFileWriter` held in a HashMap. @@ -64,6 +70,14 @@ impl TableWrite { let schema = table.schema(); let core_options = CoreOptions::new(schema.options()); + if schema_contains_blob_type(schema.fields()) { + return Err(crate::Error::Unsupported { + message: + "TableWrite does not support BlobType yet; blob write path is out of scope" + .to_string(), + }); + } + if !schema.primary_keys().is_empty() { return Err(crate::Error::Unsupported { message: "TableWrite does not support tables with primary keys".to_string(), @@ -308,8 +322,8 @@ mod tests { use crate::catalog::Identifier; use crate::io::{FileIO, FileIOBuilder}; use crate::spec::{ - DataType, DecimalType, IntType, LocalZonedTimestampType, Schema, TableSchema, - TimestampType, VarCharType, + BlobType, DataField, DataType, DecimalType, IntType, LocalZonedTimestampType, RowType, + Schema, TableSchema, TimestampType, VarCharType, }; use crate::table::{SnapshotManager, TableCommit}; use arrow_array::Int32Array; @@ -361,6 +375,30 @@ mod tests { ) } + fn test_blob_table_schema() -> TableSchema { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("payload", DataType::Blob(BlobType::new())) + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_nested_blob_table_schema() -> TableSchema { + let schema = Schema::builder() + .column( + "payload", + DataType::Row(RowType::new(vec![DataField::new( + 1, + "blob".into(), + DataType::Blob(BlobType::new()), + )])), + ) + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + async fn setup_dirs(file_io: &FileIO, table_path: &str) { file_io .mkdirs(&format!("{table_path}/snapshot/")) @@ -430,6 +468,38 @@ mod tests { assert_eq!(snapshot.total_record_count(), Some(3)); } + #[test] + fn test_rejects_blob_table() { + let table = Table::new( + test_file_io(), + Identifier::new("default", "test_blob_table"), + "memory:/test_blob_table".to_string(), + test_blob_table_schema(), + None, + ); + + let err = TableWrite::new(&table).err().unwrap(); + assert!( + matches!(err, crate::Error::Unsupported { message } if message.contains("BlobType")) + ); + } + + #[test] + fn test_rejects_nested_blob_table() { + let table = Table::new( + test_file_io(), + Identifier::new("default", "test_nested_blob_table"), + "memory:/test_nested_blob_table".to_string(), + test_nested_blob_table_schema(), + None, + ); + + let err = TableWrite::new(&table).err().unwrap(); + assert!( + matches!(err, crate::Error::Unsupported { message } if message.contains("BlobType")) + ); + } + #[tokio::test] async fn test_write_partitioned() { let file_io = test_file_io(); diff --git a/crates/paimon/tests/fixtures/blob_type.json b/crates/paimon/tests/fixtures/blob_type.json new file mode 100644 index 00000000..ba26b545 --- /dev/null +++ b/crates/paimon/tests/fixtures/blob_type.json @@ -0,0 +1 @@ +"BLOB NOT NULL" diff --git a/crates/paimon/tests/fixtures/blob_type_nullable.json b/crates/paimon/tests/fixtures/blob_type_nullable.json new file mode 100644 index 00000000..5b712342 --- /dev/null +++ b/crates/paimon/tests/fixtures/blob_type_nullable.json @@ -0,0 +1 @@ +"BLOB"