Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ chrono = "0.4"
datafusion = { workspace = true }
paimon = { path = "../../paimon" }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { workspace = true, features = ["rt", "time", "fs"] }

[dev-dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
flate2 = "1"
parquet = { workspace = true }
serde = "1"
serde_json = "1"
tar = "0.4"
tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
6 changes: 5 additions & 1 deletion crates/integrations/datafusion/src/system_tables/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use paimon::table::Table;
use crate::error::to_datafusion_error;

mod options;
mod schemas;

type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;

const TABLES: &[(&str, Builder)] = &[("options", options::build)];
const TABLES: &[(&str, Builder)] = &[("options", options::build), ("schemas", schemas::build)];

/// Parse a Paimon object name into `(base_table, optional system_table_name)`.
///
Expand Down Expand Up @@ -117,6 +118,9 @@ mod tests {
assert!(is_registered("options"));
assert!(is_registered("Options"));
assert!(is_registered("OPTIONS"));
assert!(is_registered("schemas"));
assert!(is_registered("Schemas"));
assert!(is_registered("SCHEMAS"));
assert!(!is_registered("nonsense"));
}

Expand Down
138 changes: 138 additions & 0 deletions crates/integrations/datafusion/src/system_tables/schemas.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Mirrors Java [SchemasTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java).

use std::any::Any;
use std::sync::{Arc, OnceLock};

use async_trait::async_trait;
use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray, TimestampMillisecondArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion::catalog::Session;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::{DataFusionError, Result as DFResult};
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use paimon::table::Table;
use serde::Serialize;

use crate::error::to_datafusion_error;

pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
Ok(Arc::new(SchemasTable { table }))
}

fn schemas_schema() -> SchemaRef {
static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
SCHEMA
.get_or_init(|| {
Arc::new(Schema::new(vec![
Field::new("schema_id", DataType::Int64, false),
Field::new("fields", DataType::Utf8, false),
Field::new("partition_keys", DataType::Utf8, false),
Field::new("primary_keys", DataType::Utf8, false),
Field::new("options", DataType::Utf8, false),
Field::new("comment", DataType::Utf8, true),
Field::new(
"update_time",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
]))
})
.clone()
}

#[derive(Debug)]
struct SchemasTable {
table: Table,
}

#[async_trait]
impl TableProvider for SchemasTable {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
schemas_schema()
}

fn table_type(&self) -> TableType {
TableType::View
}

async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let schemas = self
.table
.schema_manager()
.list_all()
.await
.map_err(to_datafusion_error)?;

let n = schemas.len();
let mut schema_ids: Vec<i64> = Vec::with_capacity(n);
let mut fields_json: Vec<String> = Vec::with_capacity(n);
let mut partition_keys_json: Vec<String> = Vec::with_capacity(n);
let mut primary_keys_json: Vec<String> = Vec::with_capacity(n);
let mut options_json: Vec<String> = Vec::with_capacity(n);
let mut comments: Vec<Option<String>> = Vec::with_capacity(n);
let mut update_times: Vec<i64> = Vec::with_capacity(n);

for schema in &schemas {
schema_ids.push(schema.id());
fields_json.push(to_json(schema.fields())?);
partition_keys_json.push(to_json(schema.partition_keys())?);
primary_keys_json.push(to_json(schema.primary_keys())?);
options_json.push(to_json(schema.options())?);
comments.push(schema.comment().map(str::to_string));
update_times.push(schema.time_millis());
}

let schema = schemas_schema();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(schema_ids)),
Arc::new(StringArray::from(fields_json)),
Arc::new(StringArray::from(partition_keys_json)),
Arc::new(StringArray::from(primary_keys_json)),
Arc::new(StringArray::from(options_json)),
Arc::new(StringArray::from(comments)),
Arc::new(TimestampMillisecondArray::from(update_times)),
],
)?;

Ok(MemorySourceConfig::try_new_exec(
&[vec![batch]],
schema,
projection.cloned(),
)?)
}
}

fn to_json<T: Serialize + ?Sized>(value: &T) -> DFResult<String> {
serde_json::to_string(value).map_err(|e| DataFusionError::External(Box::new(e)))
}
109 changes: 107 additions & 2 deletions crates/integrations/datafusion/tests/system_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
// specific language governing permissions and limitations
// under the License.

//! Paimon `$options` system table end-to-end via DataFusion SQL.
//! Paimon system tables end-to-end via DataFusion SQL.

use std::sync::Arc;

use datafusion::arrow::array::{Array, StringArray};
use datafusion::arrow::array::{Array, Int64Array, StringArray};
use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::prelude::SessionContext;
use paimon::catalog::Identifier;
Expand Down Expand Up @@ -129,6 +130,110 @@ async fn test_unknown_system_table_name_returns_not_found() {
);
}

#[tokio::test]
async fn test_schemas_system_table() {
let (ctx, catalog, _tmp) = create_context().await;
let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$schemas");
let batches = run_sql(&ctx, &sql).await;

assert!(!batches.is_empty(), "$schemas should return ≥1 batch");

let arrow_schema = batches[0].schema();
let expected_columns = [
("schema_id", DataType::Int64),
("fields", DataType::Utf8),
("partition_keys", DataType::Utf8),
("primary_keys", DataType::Utf8),
("options", DataType::Utf8),
("comment", DataType::Utf8),
(
"update_time",
DataType::Timestamp(TimeUnit::Millisecond, None),
),
];
for (i, (name, dtype)) in expected_columns.iter().enumerate() {
let field = arrow_schema.field(i);
assert_eq!(field.name(), name, "column {i} name");
assert_eq!(field.data_type(), dtype, "column {i} type");
}

let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string());
let table = catalog
.get_table(&identifier)
.await
.expect("fixture table should load");
let all_schemas = table
.schema_manager()
.list_all()
.await
.expect("list_all should succeed");
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows,
all_schemas.len(),
"$schemas rows should match list_all() length"
);

let mut ids: Vec<i64> = Vec::new();
for batch in &batches {
let col = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.expect("schema_id is Int64");
for i in 0..batch.num_rows() {
ids.push(col.value(i));
}
}
let mut sorted = ids.clone();
sorted.sort_unstable();
assert_eq!(ids, sorted, "schema_id column should be ascending");

// The last row's JSON columns must round-trip to the current schema.
let last_batch = batches.last().unwrap();
let last_idx = last_batch.num_rows() - 1;
let latest = table.schema();
let json_columns: [(usize, &str, String); 4] = [
(
1,
"fields",
serde_json::to_string(latest.fields()).expect("serialize fields"),
),
(
2,
"partition_keys",
serde_json::to_string(latest.partition_keys()).expect("serialize partition_keys"),
),
(
3,
"primary_keys",
serde_json::to_string(latest.primary_keys()).expect("serialize primary_keys"),
),
(
4,
"options",
serde_json::to_string(latest.options()).expect("serialize options"),
),
];
for (col_idx, col_name, expected) in &json_columns {
let col = last_batch
.column(*col_idx)
.as_any()
.downcast_ref::<StringArray>()
.unwrap_or_else(|| panic!("column {col_name} is not Utf8"));
// Parse both sides before comparing: `options` is a HashMap whose
// JSON key order is non-deterministic across `HashMap` instances.
let actual: serde_json::Value = serde_json::from_str(col.value(last_idx))
.unwrap_or_else(|e| panic!("parse actual `{col_name}`: {e}"));
let expected: serde_json::Value = serde_json::from_str(expected)
.unwrap_or_else(|e| panic!("parse expected `{col_name}`: {e}"));
assert_eq!(
actual, expected,
"latest-row `{col_name}` JSON should round-trip"
);
}
}

#[tokio::test]
async fn test_missing_base_table_for_system_table_errors() {
let (ctx, _catalog, _tmp) = create_context().await;
Expand Down
34 changes: 3 additions & 31 deletions crates/paimon/src/catalog/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::common::{CatalogOptions, Options};
use crate::error::{ConfigInvalidSnafu, Error, Result};
use crate::io::FileIO;
use crate::spec::{Schema, TableSchema};
use crate::table::Table;
use crate::table::{SchemaManager, Table};
use async_trait::async_trait;
use bytes::Bytes;
use opendal::raw::get_basename;
Expand Down Expand Up @@ -167,36 +167,8 @@ impl FileSystemCatalog {

/// Load the latest schema for a table (highest schema-{version} file under table_path/schema).
async fn load_latest_table_schema(&self, table_path: &str) -> Result<Option<TableSchema>> {
let schema_dir = self.schema_dir_path(table_path);
if !self.file_io.exists(&schema_dir).await? {
return Ok(None);
}
let statuses = self.file_io.list_status(&schema_dir).await?;

let latest_schema_id = statuses
.into_iter()
.filter(|s| !s.is_dir)
.filter_map(|s| {
get_basename(s.path.as_str())
.strip_prefix(SCHEMA_PREFIX)?
.parse::<i64>()
.ok()
})
.max();

if let Some(schema_id) = latest_schema_id {
let schema_path = self.schema_file_path(table_path, schema_id);
let input_file = self.file_io.new_input(&schema_path)?;
let content = input_file.read().await?;
let schema: TableSchema =
serde_json::from_slice(&content).map_err(|e| Error::DataInvalid {
message: format!("Failed to parse schema file: {schema_path}"),
source: Some(Box::new(e)),
})?;
return Ok(Some(schema));
}

Ok(None)
let manager = SchemaManager::new(self.file_io.clone(), table_path.to_string());
Ok(manager.latest().await?.map(|arc| (*arc).clone()))
}

/// Save a table schema to a file.
Expand Down
Loading
Loading