diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index ddfa2c04..eb8c644b 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -36,6 +36,8 @@ chrono = "0.4" datafusion = { workspace = true } paimon = { path = "../../paimon" } futures = "0.3" +serde = { version = "1", features = ["derive"] } +serde_json = "1" tokio = { workspace = true, features = ["rt", "time", "fs"] } [dev-dependencies] @@ -43,8 +45,6 @@ arrow-array = { workspace = true } arrow-schema = { workspace = true } flate2 = "1" parquet = { workspace = true } -serde = "1" -serde_json = "1" tar = "0.4" tempfile = "3" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs b/crates/integrations/datafusion/src/system_tables/mod.rs index fddb3b9c..65a2176d 100644 --- a/crates/integrations/datafusion/src/system_tables/mod.rs +++ b/crates/integrations/datafusion/src/system_tables/mod.rs @@ -30,10 +30,11 @@ use paimon::table::Table; use crate::error::to_datafusion_error; mod options; +mod schemas; type Builder = fn(Table) -> DFResult>; -const TABLES: &[(&str, Builder)] = &[("options", options::build)]; +const TABLES: &[(&str, Builder)] = &[("options", options::build), ("schemas", schemas::build)]; /// Parse a Paimon object name into `(base_table, optional system_table_name)`. /// @@ -117,6 +118,9 @@ mod tests { assert!(is_registered("options")); assert!(is_registered("Options")); assert!(is_registered("OPTIONS")); + assert!(is_registered("schemas")); + assert!(is_registered("Schemas")); + assert!(is_registered("SCHEMAS")); assert!(!is_registered("nonsense")); } diff --git a/crates/integrations/datafusion/src/system_tables/schemas.rs b/crates/integrations/datafusion/src/system_tables/schemas.rs new file mode 100644 index 00000000..03da7933 --- /dev/null +++ b/crates/integrations/datafusion/src/system_tables/schemas.rs @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Mirrors Java [SchemasTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java). + +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use async_trait::async_trait; +use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray, TimestampMillisecondArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use datafusion::catalog::Session; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use paimon::table::Table; +use serde::Serialize; + +use crate::error::to_datafusion_error; + +pub(super) fn build(table: Table) -> DFResult> { + Ok(Arc::new(SchemasTable { table })) +} + +fn schemas_schema() -> SchemaRef { + static SCHEMA: OnceLock = OnceLock::new(); + SCHEMA + .get_or_init(|| { + Arc::new(Schema::new(vec![ + Field::new("schema_id", DataType::Int64, false), + Field::new("fields", DataType::Utf8, false), + Field::new("partition_keys", DataType::Utf8, false), + Field::new("primary_keys", DataType::Utf8, false), + Field::new("options", DataType::Utf8, false), + Field::new("comment", DataType::Utf8, true), + Field::new( + "update_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + ])) + }) + .clone() +} + +#[derive(Debug)] +struct SchemasTable { + table: Table, +} + +#[async_trait] +impl TableProvider for SchemasTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + schemas_schema() + } + + fn table_type(&self) -> TableType { + TableType::View + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let schemas = self + .table + .schema_manager() + .list_all() + .await + .map_err(to_datafusion_error)?; + + let n = schemas.len(); + let mut schema_ids: Vec = Vec::with_capacity(n); + let mut fields_json: Vec = Vec::with_capacity(n); + let mut partition_keys_json: Vec = Vec::with_capacity(n); + let mut primary_keys_json: Vec = Vec::with_capacity(n); + let mut options_json: Vec = Vec::with_capacity(n); + let mut comments: Vec> = Vec::with_capacity(n); + let mut update_times: Vec = Vec::with_capacity(n); + + for schema in &schemas { + schema_ids.push(schema.id()); + fields_json.push(to_json(schema.fields())?); + partition_keys_json.push(to_json(schema.partition_keys())?); + primary_keys_json.push(to_json(schema.primary_keys())?); + options_json.push(to_json(schema.options())?); + comments.push(schema.comment().map(str::to_string)); + update_times.push(schema.time_millis()); + } + + let schema = schemas_schema(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(schema_ids)), + Arc::new(StringArray::from(fields_json)), + Arc::new(StringArray::from(partition_keys_json)), + Arc::new(StringArray::from(primary_keys_json)), + Arc::new(StringArray::from(options_json)), + Arc::new(StringArray::from(comments)), + Arc::new(TimestampMillisecondArray::from(update_times)), + ], + )?; + + Ok(MemorySourceConfig::try_new_exec( + &[vec![batch]], + schema, + projection.cloned(), + )?) + } +} + +fn to_json(value: &T) -> DFResult { + serde_json::to_string(value).map_err(|e| DataFusionError::External(Box::new(e))) +} diff --git a/crates/integrations/datafusion/tests/system_tables.rs b/crates/integrations/datafusion/tests/system_tables.rs index c3292e39..188b0002 100644 --- a/crates/integrations/datafusion/tests/system_tables.rs +++ b/crates/integrations/datafusion/tests/system_tables.rs @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. -//! Paimon `$options` system table end-to-end via DataFusion SQL. +//! Paimon system tables end-to-end via DataFusion SQL. use std::sync::Arc; -use datafusion::arrow::array::{Array, StringArray}; +use datafusion::arrow::array::{Array, Int64Array, StringArray}; +use datafusion::arrow::datatypes::{DataType, TimeUnit}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::prelude::SessionContext; use paimon::catalog::Identifier; @@ -129,6 +130,110 @@ async fn test_unknown_system_table_name_returns_not_found() { ); } +#[tokio::test] +async fn test_schemas_system_table() { + let (ctx, catalog, _tmp) = create_context().await; + let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$schemas"); + let batches = run_sql(&ctx, &sql).await; + + assert!(!batches.is_empty(), "$schemas should return ≥1 batch"); + + let arrow_schema = batches[0].schema(); + let expected_columns = [ + ("schema_id", DataType::Int64), + ("fields", DataType::Utf8), + ("partition_keys", DataType::Utf8), + ("primary_keys", DataType::Utf8), + ("options", DataType::Utf8), + ("comment", DataType::Utf8), + ( + "update_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + ), + ]; + for (i, (name, dtype)) in expected_columns.iter().enumerate() { + let field = arrow_schema.field(i); + assert_eq!(field.name(), name, "column {i} name"); + assert_eq!(field.data_type(), dtype, "column {i} type"); + } + + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let table = catalog + .get_table(&identifier) + .await + .expect("fixture table should load"); + let all_schemas = table + .schema_manager() + .list_all() + .await + .expect("list_all should succeed"); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, + all_schemas.len(), + "$schemas rows should match list_all() length" + ); + + let mut ids: Vec = Vec::new(); + for batch in &batches { + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("schema_id is Int64"); + for i in 0..batch.num_rows() { + ids.push(col.value(i)); + } + } + let mut sorted = ids.clone(); + sorted.sort_unstable(); + assert_eq!(ids, sorted, "schema_id column should be ascending"); + + // The last row's JSON columns must round-trip to the current schema. + let last_batch = batches.last().unwrap(); + let last_idx = last_batch.num_rows() - 1; + let latest = table.schema(); + let json_columns: [(usize, &str, String); 4] = [ + ( + 1, + "fields", + serde_json::to_string(latest.fields()).expect("serialize fields"), + ), + ( + 2, + "partition_keys", + serde_json::to_string(latest.partition_keys()).expect("serialize partition_keys"), + ), + ( + 3, + "primary_keys", + serde_json::to_string(latest.primary_keys()).expect("serialize primary_keys"), + ), + ( + 4, + "options", + serde_json::to_string(latest.options()).expect("serialize options"), + ), + ]; + for (col_idx, col_name, expected) in &json_columns { + let col = last_batch + .column(*col_idx) + .as_any() + .downcast_ref::() + .unwrap_or_else(|| panic!("column {col_name} is not Utf8")); + // Parse both sides before comparing: `options` is a HashMap whose + // JSON key order is non-deterministic across `HashMap` instances. + let actual: serde_json::Value = serde_json::from_str(col.value(last_idx)) + .unwrap_or_else(|e| panic!("parse actual `{col_name}`: {e}")); + let expected: serde_json::Value = serde_json::from_str(expected) + .unwrap_or_else(|e| panic!("parse expected `{col_name}`: {e}")); + assert_eq!( + actual, expected, + "latest-row `{col_name}` JSON should round-trip" + ); + } +} + #[tokio::test] async fn test_missing_base_table_for_system_table_errors() { let (ctx, _catalog, _tmp) = create_context().await; diff --git a/crates/paimon/src/catalog/filesystem.rs b/crates/paimon/src/catalog/filesystem.rs index a061e8ab..e61324f3 100644 --- a/crates/paimon/src/catalog/filesystem.rs +++ b/crates/paimon/src/catalog/filesystem.rs @@ -26,7 +26,7 @@ use crate::common::{CatalogOptions, Options}; use crate::error::{ConfigInvalidSnafu, Error, Result}; use crate::io::FileIO; use crate::spec::{Schema, TableSchema}; -use crate::table::Table; +use crate::table::{SchemaManager, Table}; use async_trait::async_trait; use bytes::Bytes; use opendal::raw::get_basename; @@ -167,36 +167,8 @@ impl FileSystemCatalog { /// Load the latest schema for a table (highest schema-{version} file under table_path/schema). async fn load_latest_table_schema(&self, table_path: &str) -> Result> { - let schema_dir = self.schema_dir_path(table_path); - if !self.file_io.exists(&schema_dir).await? { - return Ok(None); - } - let statuses = self.file_io.list_status(&schema_dir).await?; - - let latest_schema_id = statuses - .into_iter() - .filter(|s| !s.is_dir) - .filter_map(|s| { - get_basename(s.path.as_str()) - .strip_prefix(SCHEMA_PREFIX)? - .parse::() - .ok() - }) - .max(); - - if let Some(schema_id) = latest_schema_id { - let schema_path = self.schema_file_path(table_path, schema_id); - let input_file = self.file_io.new_input(&schema_path)?; - let content = input_file.read().await?; - let schema: TableSchema = - serde_json::from_slice(&content).map_err(|e| Error::DataInvalid { - message: format!("Failed to parse schema file: {schema_path}"), - source: Some(Box::new(e)), - })?; - return Ok(Some(schema)); - } - - Ok(None) + let manager = SchemaManager::new(self.file_io.clone(), table_path.to_string()); + Ok(manager.latest().await?.map(|arc| (*arc).clone())) } /// Save a table schema to a file. diff --git a/crates/paimon/src/table/schema_manager.rs b/crates/paimon/src/table/schema_manager.rs index 057dc3f1..61e3b81d 100644 --- a/crates/paimon/src/table/schema_manager.rs +++ b/crates/paimon/src/table/schema_manager.rs @@ -21,6 +21,8 @@ use crate::io::FileIO; use crate::spec::TableSchema; +use futures::future::try_join_all; +use opendal::raw::get_basename; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -65,6 +67,48 @@ impl SchemaManager { format!("{}/{}{}", self.schema_directory(), SCHEMA_PREFIX, schema_id) } + /// List all schema ids sorted ascending. Returns an empty vector if the + /// schema directory is missing or empty. + /// + /// Mirrors Java [SchemaManager.listAllIds()](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java). + pub async fn list_all_ids(&self) -> crate::Result> { + let mut ids: Vec = self + .file_io + .list_status(&self.schema_directory()) + .await? + .into_iter() + .filter(|s| !s.is_dir) + .filter_map(|s| { + get_basename(s.path.as_str()) + .strip_prefix(SCHEMA_PREFIX)? + .parse::() + .ok() + }) + .collect(); + ids.sort_unstable(); + Ok(ids) + } + + /// List all schemas sorted by id ascending. + /// + /// Mirrors Java [SchemaManager.listAll()](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java). + pub async fn list_all(&self) -> crate::Result>> { + let ids = self.list_all_ids().await?; + try_join_all(ids.into_iter().map(|id| self.schema(id))).await + } + + /// Return the schema with the highest id, or `None` when no schema files + /// exist under the schema directory. + /// + /// Mirrors Java [SchemaManager.latest()](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java). + pub async fn latest(&self) -> crate::Result>> { + let ids = self.list_all_ids().await?; + match ids.last() { + Some(&max_id) => Ok(Some(self.schema(max_id).await?)), + None => Ok(None), + } + } + /// Load a schema by ID. Returns cached version if available. /// /// The cache is shared across all clones of this `SchemaManager`, so loading @@ -101,3 +145,114 @@ impl SchemaManager { Ok(schema) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::Schema; + use bytes::Bytes; + + fn memory_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + async fn write_schema_file(file_io: &FileIO, dir: &str, id: i64) { + let schema = Schema::builder().build().unwrap(); + let table_schema = TableSchema::new(id, &schema); + let json = serde_json::to_vec(&table_schema).unwrap(); + let path = format!("{dir}/{SCHEMA_PREFIX}{id}"); + let out = file_io.new_output(&path).unwrap(); + out.write(Bytes::from(json)).await.unwrap(); + } + + #[tokio::test] + async fn list_all_ids_returns_empty_for_missing_directory() { + let file_io = memory_file_io(); + let sm = SchemaManager::new(file_io, "memory:/list_missing".to_string()); + assert!(sm.list_all_ids().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn list_all_ids_returns_empty_for_empty_directory() { + let file_io = memory_file_io(); + let table_path = "memory:/list_empty"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + + let sm = SchemaManager::new(file_io, table_path.to_string()); + assert!(sm.list_all_ids().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn list_all_ids_sorts_ascending() { + let file_io = memory_file_io(); + let table_path = "memory:/list_sorted"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + for id in [3, 0, 2, 1] { + write_schema_file(&file_io, &dir, id).await; + } + + let sm = SchemaManager::new(file_io, table_path.to_string()); + assert_eq!(sm.list_all_ids().await.unwrap(), vec![0, 1, 2, 3]); + } + + #[tokio::test] + async fn list_all_ids_ignores_unrelated_files() { + let file_io = memory_file_io(); + let table_path = "memory:/list_filter"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + write_schema_file(&file_io, &dir, 0).await; + // `schema-foo` starts with the prefix but is not an i64. + let junk = file_io + .new_output(&format!("{dir}/{SCHEMA_PREFIX}foo")) + .unwrap(); + junk.write(Bytes::from("{}")).await.unwrap(); + // A completely unrelated file. + let other = file_io.new_output(&format!("{dir}/README")).unwrap(); + other.write(Bytes::from("hi")).await.unwrap(); + + let sm = SchemaManager::new(file_io, table_path.to_string()); + assert_eq!(sm.list_all_ids().await.unwrap(), vec![0]); + } + + #[tokio::test] + async fn list_all_loads_schemas_in_order() { + let file_io = memory_file_io(); + let table_path = "memory:/list_all_load"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + for id in [0, 2, 1] { + write_schema_file(&file_io, &dir, id).await; + } + + let sm = SchemaManager::new(file_io, table_path.to_string()); + let schemas = sm.list_all().await.unwrap(); + let ids: Vec = schemas.iter().map(|s| s.id()).collect(); + assert_eq!(ids, vec![0, 1, 2]); + } + + #[tokio::test] + async fn latest_returns_none_when_no_schemas() { + let file_io = memory_file_io(); + let sm = SchemaManager::new(file_io, "memory:/latest_none".to_string()); + assert!(sm.latest().await.unwrap().is_none()); + } + + #[tokio::test] + async fn latest_returns_max_id_schema() { + let file_io = memory_file_io(); + let table_path = "memory:/latest_max"; + let dir = format!("{table_path}/{SCHEMA_DIR}"); + file_io.mkdirs(&dir).await.unwrap(); + for id in [0, 5, 2] { + write_schema_file(&file_io, &dir, id).await; + } + + let sm = SchemaManager::new(file_io, table_path.to_string()); + let latest = sm.latest().await.unwrap().expect("latest"); + assert_eq!(latest.id(), 5); + } +}