Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ arrow-buffer = "58.0"
arrow-schema = "58.0"
arrow-cast = "58.0"
arrow-ord = "58.0"
arrow-row = "58.0"
arrow-select = "58.0"
datafusion = "53.0.0"
datafusion-ffi = "53.0.0"
Expand Down
27 changes: 0 additions & 27 deletions crates/integration_tests/tests/append_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,33 +563,6 @@ async fn test_partitioned_fixed_bucket_write_read() {
assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30, 40]);
}

// ---------------------------------------------------------------------------
// Unsupported: primary key table should be rejected
// ---------------------------------------------------------------------------

#[tokio::test]
async fn test_reject_primary_key_table() {
let schema = Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("value", DataType::Int(IntType::new()))
.primary_key(["id"])
.build()
.unwrap();
let table_schema = TableSchema::new(0, &schema);

let file_io = memory_file_io();
let path = "memory:/append_reject_pk";
let table = make_table(&file_io, path, table_schema);

let result = table.new_write_builder().new_write();
assert!(result.is_err());
let err = result.err().unwrap();
assert!(
matches!(&err, paimon::Error::Unsupported { message } if message.contains("primary keys")),
"Expected Unsupported error for PK table, got: {err:?}"
);
}

#[tokio::test]
async fn test_reject_fixed_bucket_without_bucket_key() {
let schema = Schema::builder()
Expand Down
75 changes: 66 additions & 9 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1478,9 +1478,9 @@ async fn test_read_complex_type_table() {
// PK-without-DV and non-PK-with-DV tests
// ---------------------------------------------------------------------------

/// Reading a primary-key table without deletion vectors should return an Unsupported error.
/// Reading a primary-key table without deletion vectors should work via sort-merge reader.
#[tokio::test]
async fn test_read_pk_table_without_dv_returns_error() {
async fn test_read_pk_table_without_dv_via_sort_merge() {
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "simple_pk_table").await;

Expand All @@ -1493,16 +1493,73 @@ async fn test_read_pk_table_without_dv_returns_error() {
);

let read = table.new_read_builder().new_read();
let result = read
let stream = read
.expect("new_read should succeed")
.to_arrow(plan.splits());
let err = result
.err()
.expect("Reading PK table without DV should fail");
.to_arrow(plan.splits())
.expect("to_arrow should succeed for PK table via sort-merge");

let batches: Vec<_> = futures::TryStreamExt::try_collect(stream)
.await
.expect("Reading PK table without DV should succeed via sort-merge reader");
assert!(
matches!(&err, Error::Unsupported { message } if message.contains("primary-key")),
"Expected Unsupported error about primary-key tables, got: {err:?}"
!batches.is_empty(),
"PK table read should return non-empty results"
);

let actual = extract_id_name(&batches);
let expected = vec![
(1, "alice".to_string()),
(2, "bob".to_string()),
(3, "carol".to_string()),
];
assert_eq!(
actual, expected,
"PK table without DV should return correct rows via sort-merge reader"
);
}

/// Reading a first-row merge engine PK table should return only the first-inserted row per key.
/// The table has been compacted so all files are level > 0, and the scan skips level-0 files.
#[tokio::test]
async fn test_read_first_row_pk_table() {
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "first_row_pk_table").await;

let read_builder = table.new_read_builder();
let scan = read_builder.new_scan();
let plan = scan.plan().await.expect("Failed to plan scan");
assert!(
!plan.splits().is_empty(),
"first-row PK table should have splits to read"
);

let read = table.new_read_builder().new_read();
let stream = read
.expect("new_read should succeed")
.to_arrow(plan.splits())
.expect("to_arrow should succeed for first-row PK table");

let batches: Vec<_> = futures::TryStreamExt::try_collect(stream)
.await
.expect("Reading first-row PK table should succeed");
assert!(
!batches.is_empty(),
"first-row PK table read should return non-empty results"
);

let actual = extract_id_name(&batches);
// first-row keeps the earliest row per key:
// commit 1: (1, alice), (2, bob), (3, carol)
// commit 2: (2, bob-v2), (3, carol-v2), (4, dave) — id=2,3 ignored, id=4 is new
let expected = vec![
(1, "alice".to_string()),
(2, "bob".to_string()),
(3, "carol".to_string()),
(4, "dave".to_string()),
];
assert_eq!(
actual, expected,
"first-row PK table should keep earliest row per key"
);
}

Expand Down
Loading
Loading