diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs index e2f60fc4..37f907bb 100644 --- a/crates/paimon/src/arrow/mod.rs +++ b/crates/paimon/src/arrow/mod.rs @@ -17,11 +17,8 @@ pub(crate) mod filtering; pub(crate) mod format; -mod reader; pub(crate) mod schema_evolution; -pub use crate::arrow::reader::ArrowReaderBuilder; - use crate::spec::{ ArrayType, BigIntType, BooleanType, DataField, DataType as PaimonDataType, DateType, DecimalType, DoubleType, FloatType, IntType, LocalZonedTimestampType, MapType, RowType, diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs deleted file mode 100644 index 3f19bd84..00000000 --- a/crates/paimon/src/arrow/reader.rs +++ /dev/null @@ -1,945 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::arrow::build_target_arrow_schema; -use crate::arrow::format::create_format_reader; -use crate::arrow::schema_evolution::{create_index_mapping, NULL_FIELD_INDEX}; -use crate::deletion_vector::{DeletionVector, DeletionVectorFactory}; -use crate::io::FileIO; -use crate::spec::{DataField, DataFileMeta, Predicate, ROW_ID_FIELD_NAME}; -use crate::table::schema_manager::SchemaManager; -use crate::table::ArrowRecordBatchStream; -use crate::table::RowRange; -use crate::{DataSplit, Error}; -use arrow_array::{Array, Int64Array, RecordBatch}; -use arrow_cast::cast; - -use async_stream::try_stream; -use futures::StreamExt; -use std::collections::HashMap; -use std::sync::Arc; - -/// Builder to create ArrowReader -pub struct ArrowReaderBuilder { - batch_size: Option, - file_io: FileIO, - schema_manager: SchemaManager, - table_schema_id: i64, - predicates: Vec, - table_fields: Vec, -} - -impl ArrowReaderBuilder { - /// Create a new ArrowReaderBuilder - pub(crate) fn new( - file_io: FileIO, - schema_manager: SchemaManager, - table_schema_id: i64, - ) -> Self { - ArrowReaderBuilder { - batch_size: None, - file_io, - schema_manager, - table_schema_id, - predicates: Vec::new(), - table_fields: Vec::new(), - } - } - - /// Set data predicates used for Parquet row-group pruning and partial - /// decode-time filtering. - pub(crate) fn with_predicates(mut self, predicates: Vec) -> Self { - self.predicates = predicates; - self - } - - /// Set the full table schema fields used for filter-to-file field mapping. - pub(crate) fn with_table_fields(mut self, table_fields: Vec) -> Self { - self.table_fields = table_fields; - self - } - - /// Build the ArrowReader with the given read type (logical row type or projected subset). - /// Used to clip Parquet schema to requested columns only. - pub fn build(self, read_type: Vec) -> ArrowReader { - ArrowReader { - batch_size: self.batch_size, - file_io: self.file_io, - schema_manager: self.schema_manager, - table_schema_id: self.table_schema_id, - predicates: self.predicates, - table_fields: self.table_fields, - read_type, - } - } -} - -/// Reads data from Parquet files -#[derive(Clone)] -pub struct ArrowReader { - batch_size: Option, - file_io: FileIO, - schema_manager: SchemaManager, - table_schema_id: i64, - predicates: Vec, - table_fields: Vec, - read_type: Vec, -} - -impl ArrowReader { - /// Take a stream of DataSplits and read every data file in each split. - /// Returns a stream of Arrow RecordBatches from all files. - /// - /// Uses SchemaManager to load the data file's schema (via `DataFileMeta.schema_id`) - /// and computes field-ID-based index mapping for schema evolution (added columns, - /// type promotion, column reordering). - /// - /// Matches [RawFileSplitRead.createReader](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java). - pub fn read(self, data_splits: &[DataSplit]) -> crate::Result { - let file_io = self.file_io.clone(); - let batch_size = self.batch_size; - let splits: Vec = data_splits.to_vec(); - let read_type = self.read_type; - let predicates = self.predicates; - let table_fields = self.table_fields; - let schema_manager = self.schema_manager; - let table_schema_id = self.table_schema_id; - Ok(try_stream! { - for split in splits { - // Create DV factory for this split only. - let dv_factory = if split - .data_deletion_files() - .is_some_and(|files| files.iter().any(Option::is_some)) - { - Some( - DeletionVectorFactory::new( - &file_io, - split.data_files(), - split.data_deletion_files(), - ) - .await?, - ) - } else { - None - }; - - for file_meta in split.data_files().to_vec() { - let dv = dv_factory - .as_ref() - .and_then(|factory| factory.get_deletion_vector(&file_meta.file_name)) - .cloned(); - - // Load data file's schema if it differs from the table schema. - let data_fields: Option> = if file_meta.schema_id != table_schema_id { - let data_schema = schema_manager.schema(file_meta.schema_id).await?; - Some(data_schema.fields().to_vec()) - } else { - None - }; - - let mut stream = read_single_file_stream( - file_io.clone(), - SingleFileReadRequest { - split: split.clone(), - file_meta, - read_type: read_type.clone(), - table_fields: table_fields.clone(), - data_fields, - predicates: predicates.clone(), - batch_size, - dv, - row_ranges: None, - }, - )?; - while let Some(batch) = stream.next().await { - yield batch?; - } - } - } - } - .boxed()) - } - - /// Read data files in data evolution mode, merging columns from files that share the same row ID range. - /// - /// Each DataSplit contains files grouped by `first_row_id`. Files within a split may contain - /// different columns for the same logical rows. This method reads each file and merges them - /// column-wise, respecting `max_sequence_number` for conflict resolution. - /// - /// `table_fields` is the full table schema fields, used to determine which columns each file - /// provides when `write_cols` is not set. - pub fn read_data_evolution( - self, - data_splits: &[DataSplit], - table_fields: &[DataField], - ) -> crate::Result { - let file_io = self.file_io.clone(); - let batch_size = self.batch_size; - let splits: Vec = data_splits.to_vec(); - let read_type = self.read_type; - let table_fields: Vec = table_fields.to_vec(); - let schema_manager = self.schema_manager; - let table_schema_id = self.table_schema_id; - - let row_id_index = read_type.iter().position(|f| f.name() == ROW_ID_FIELD_NAME); - let file_read_type: Vec = read_type - .iter() - .filter(|f| f.name() != ROW_ID_FIELD_NAME) - .cloned() - .collect(); - let output_schema = build_target_arrow_schema(&read_type)?; - - Ok(try_stream! { - for split in splits { - let row_ranges = split.row_ranges().map(|r| r.to_vec()); - - if split.raw_convertible() || split.data_files().len() == 1 { - for file_meta in split.data_files().to_vec() { - let data_fields: Option> = if file_meta.schema_id != table_schema_id { - let data_schema = schema_manager.schema(file_meta.schema_id).await?; - Some(data_schema.fields().to_vec()) - } else { - None - }; - - let has_row_id = file_meta.first_row_id.is_some(); - let effective_row_ranges = if has_row_id { row_ranges.clone() } else { None }; - - let selected_row_ids = if row_id_index.is_some() && has_row_id { - effective_row_ranges.as_ref().map(|ranges| { - expand_selected_row_ids( - file_meta.first_row_id.unwrap(), - file_meta.row_count, - ranges, - ) - }) - } else { - None - }; - let file_base_row_id = file_meta.first_row_id.unwrap_or(0); - let mut row_id_cursor = file_base_row_id; - let mut row_id_offset: usize = 0; - - let mut stream = read_single_file_stream( - file_io.clone(), - SingleFileReadRequest { - split: split.clone(), - file_meta, - read_type: file_read_type.clone(), - table_fields: table_fields.clone(), - data_fields, - predicates: Vec::new(), - batch_size, - dv: None, - row_ranges: effective_row_ranges, - }, - )?; - while let Some(batch) = stream.next().await { - let batch = batch?; - let num_rows = batch.num_rows(); - if let Some(idx) = row_id_index { - if !has_row_id { - yield append_null_row_id_column(batch, idx, &output_schema)?; - } else if let Some(ref ids) = selected_row_ids { - yield attach_row_id(batch, idx, ids, &mut row_id_offset, &output_schema)?; - } else { - let row_ids: Vec = (row_id_cursor..row_id_cursor + num_rows as i64).collect(); - row_id_cursor += num_rows as i64; - let array: Arc = Arc::new(Int64Array::from(row_ids)); - yield insert_column_at(batch, array, idx, &output_schema)?; - } - } else { - yield batch; - } - } - } - } else { - let files = split.data_files(); - assert!( - files.iter().all(|f| f.first_row_id.is_some()), - "All files in a field merge split should have first_row_id" - ); - assert!( - files.iter().all(|f| f.row_count == files[0].row_count), - "All files in a field merge split should have the same row count" - ); - assert!( - files.iter().all(|f| f.first_row_id == files[0].first_row_id), - "All files in a field merge split should have the same first row id" - ); - - let group_base_row_id = files - .iter() - .filter_map(|f| f.first_row_id) - .min(); - let has_group_row_id = group_base_row_id.is_some(); - let group_row_count = files.iter().map(|f| f.row_count).max().unwrap_or(0); - let effective_row_ranges = if has_group_row_id { row_ranges.clone() } else { None }; - - let selected_row_ids = if row_id_index.is_some() && has_group_row_id { - effective_row_ranges.as_ref().map(|ranges| { - expand_selected_row_ids( - group_base_row_id.unwrap(), - group_row_count, - ranges, - ) - }) - } else { - None - }; - let mut row_id_cursor = group_base_row_id.unwrap_or(0); - let mut row_id_offset: usize = 0; - - let mut merge_stream = merge_files_by_columns( - &file_io, - &split, - &file_read_type, - &table_fields, - schema_manager.clone(), - table_schema_id, - batch_size, - effective_row_ranges, - )?; - while let Some(batch) = merge_stream.next().await { - let batch = batch?; - let num_rows = batch.num_rows(); - if let Some(idx) = row_id_index { - if !has_group_row_id { - yield append_null_row_id_column(batch, idx, &output_schema)?; - } else if let Some(ref ids) = selected_row_ids { - yield attach_row_id(batch, idx, ids, &mut row_id_offset, &output_schema)?; - } else { - let row_ids: Vec = (row_id_cursor..row_id_cursor + num_rows as i64).collect(); - row_id_cursor += num_rows as i64; - let array: Arc = Arc::new(Int64Array::from(row_ids)); - yield insert_column_at(batch, array, idx, &output_schema)?; - } - } else { - yield batch; - } - } - } - } - } - .boxed()) - } -} - -struct SingleFileReadRequest { - split: DataSplit, - file_meta: DataFileMeta, - read_type: Vec, - table_fields: Vec, - data_fields: Option>, - predicates: Vec, - batch_size: Option, - dv: Option>, - row_ranges: Option>, -} - -/// Read a single parquet file from a split, returning a lazy stream of batches. -/// Optionally applies a deletion vector. -/// -/// Handles schema evolution using field-ID-based index mapping: -/// - `data_fields`: if `Some`, the fields from the data file's schema (loaded via SchemaManager). -/// Used to compute index mapping between `read_type` and data fields by field ID. -/// - Columns missing from the file are filled with null arrays. -/// - Columns whose Arrow type differs from the target type are cast (type promotion). -/// -/// Reference: [RawFileSplitRead.createFileReader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java) -fn read_single_file_stream( - file_io: FileIO, - request: SingleFileReadRequest, -) -> crate::Result { - let SingleFileReadRequest { - split, - file_meta, - read_type, - table_fields, - data_fields, - predicates, - batch_size, - dv, - row_ranges, - } = request; - - let target_schema = build_target_arrow_schema(&read_type)?; - let file_fields = data_fields.clone().unwrap_or_else(|| table_fields.clone()); - - // Compute index mapping and determine which columns to read from the file. - // If data_fields is provided, use field-ID-based mapping; otherwise use read_type names directly. - let (projected_read_fields, index_mapping) = if let Some(ref df) = data_fields { - let mapping = create_index_mapping(&read_type, df); - match mapping { - Some(ref idx_map) => { - // Only read data fields that are referenced by the index mapping. - // Dedup by data field index to avoid duplicate column projections. - let mut seen = std::collections::HashSet::new(); - let fields_to_read: Vec = idx_map - .iter() - .filter(|&&idx| idx != NULL_FIELD_INDEX && seen.insert(idx)) - .map(|&idx| df[idx as usize].clone()) - .collect(); - (fields_to_read, Some(idx_map.clone())) - } - None => { - // Identity mapping — read data fields in order. - (df.clone(), None) - } - } - } else { - // No schema evolution — read by read_type names. - (read_type.clone(), None) - }; - - // Remap predicates from table-level to file-level indices. - let file_predicates = { - let remapped = crate::arrow::filtering::remap_predicates_to_file( - &predicates, - &table_fields, - &file_fields, - ); - if remapped.is_empty() { - None - } else { - Some(crate::arrow::format::FilePredicates { - predicates: remapped, - file_fields: file_fields.clone(), - }) - } - }; - - Ok(try_stream! { - let path_to_read = split.data_file_path(&file_meta); - let format_reader = create_format_reader(&path_to_read)?; - let input_file = file_io.new_input(&path_to_read)?; - let file_reader = input_file.reader().await?; - let local_ranges = row_ranges.as_ref().map(|ranges| { - to_local_row_ranges(ranges, file_meta.first_row_id.unwrap_or(0), file_meta.row_count) - }); - - let row_selection = merge_row_selection( - file_meta.row_count, - dv.as_deref(), - local_ranges.as_deref(), - ); - - let mut batch_stream = format_reader.read_batch_stream( - Box::new(file_reader), - file_meta.file_size as u64, - &projected_read_fields, - file_predicates.as_ref(), - batch_size, - row_selection, - ).await?; - - while let Some(batch) = batch_stream.next().await { - let batch = batch?; - let num_rows = batch.num_rows(); - let batch_schema = batch.schema(); - - // Build output columns using index mapping (field-ID-based) or by name. - let mut columns: Vec> = Vec::with_capacity(target_schema.fields().len()); - for (i, target_field) in target_schema.fields().iter().enumerate() { - let source_col = if let Some(ref idx_map) = index_mapping { - let data_idx = idx_map[i]; - if data_idx == NULL_FIELD_INDEX { - None - } else { - // Find the column in the batch by the data field's name. - let data_field = &data_fields.as_ref().unwrap()[data_idx as usize]; - batch_schema - .index_of(data_field.name()) - .ok() - .map(|col_idx| batch.column(col_idx)) - } - } else if let Some(ref df) = data_fields { - // Identity mapping with data_fields present (e.g. renamed column). - // Use data field name (old name in parquet) at the same position. - batch_schema - .index_of(df[i].name()) - .ok() - .map(|col_idx| batch.column(col_idx)) - } else { - // No schema evolution — look up by target field name. - batch_schema - .index_of(target_field.name()) - .ok() - .map(|col_idx| batch.column(col_idx)) - }; - - match source_col { - Some(col) => { - if col.data_type() == target_field.data_type() { - columns.push(col.clone()); - } else { - // Type promotion: cast to target type. - let casted = cast(col, target_field.data_type()).map_err(|e| { - Error::UnexpectedError { - message: format!( - "Failed to cast column '{}' from {:?} to {:?}: {e}", - target_field.name(), - col.data_type(), - target_field.data_type() - ), - source: Some(Box::new(e)), - } - })?; - columns.push(casted); - } - } - None => { - // Column missing from file: fill with nulls. - let null_array = arrow_array::new_null_array(target_field.data_type(), num_rows); - columns.push(null_array); - } - } - } - - let result = if columns.is_empty() { - RecordBatch::try_new_with_options( - target_schema.clone(), - columns, - &arrow_array::RecordBatchOptions::new().with_row_count(Some(num_rows)), - ) - } else { - RecordBatch::try_new(target_schema.clone(), columns) - } - .map_err(|e| { - Error::UnexpectedError { - message: format!("Failed to build schema-evolved RecordBatch: {e}"), - source: Some(Box::new(e)), - } - })?; - yield result; - } - } - .boxed()) -} - -/// Merge multiple files column-wise for data evolution, streaming with bounded memory. -/// -/// Uses field IDs (not column names) to resolve which file provides which column, -/// ensuring correctness across schema evolution (column rename, add, drop). -/// -/// Opens all file readers simultaneously and maintains a cursor (current batch + offset) -/// per file. Each poll slices up to `batch_size` rows from each file's current batch, -/// assembles columns from the winning files, and yields the merged batch. When a file's -/// current batch is exhausted, the next batch is read from its stream on demand. -#[allow(clippy::too_many_arguments)] -fn merge_files_by_columns( - file_io: &FileIO, - split: &DataSplit, - read_type: &[DataField], - table_fields: &[DataField], - schema_manager: SchemaManager, - table_schema_id: i64, - batch_size: Option, - row_ranges: Option>, -) -> crate::Result { - let data_files = split.data_files(); - if data_files.is_empty() { - return Ok(futures::stream::empty().boxed()); - } - - // Build owned data for the stream closure. - let file_io = file_io.clone(); - let split = split.clone(); - let data_files: Vec = data_files.to_vec(); - let read_type = read_type.to_vec(); - let table_fields = table_fields.to_vec(); - let output_batch_size = batch_size.unwrap_or(1024); - let target_schema = build_target_arrow_schema(&read_type)?; - - Ok(try_stream! { - // Pre-load schemas and collect field IDs + data_fields per file. - // file_idx -> (field_ids, Option>) - let mut file_info: HashMap, Option>)> = HashMap::new(); - - for (file_idx, file_meta) in data_files.iter().enumerate() { - let (field_ids, data_fields) = if file_meta.schema_id != table_schema_id { - let file_schema = schema_manager.schema(file_meta.schema_id).await?; - let file_fields = file_schema.fields(); - - let ids: Vec = if let Some(ref wc) = file_meta.write_cols { - // write_cols names are from the file's schema at write time. - wc.iter() - .filter_map(|name| file_fields.iter().find(|f| f.name() == name).map(|f| f.id())) - .collect() - } else { - file_fields.iter().map(|f| f.id()).collect() - }; - - (ids, Some(file_fields.to_vec())) - } else { - let ids: Vec = if let Some(ref wc) = file_meta.write_cols { - // write_cols names are from the current table schema. - wc.iter() - .filter_map(|name| table_fields.iter().find(|f| f.name() == name).map(|f| f.id())) - .collect() - } else { - table_fields.iter().map(|f| f.id()).collect() - }; - - (ids, None) - }; - - file_info.insert(file_idx, (field_ids, data_fields)); - } - - // Determine which file provides each field ID, resolving conflicts by max_sequence_number. - // field_id -> (file_index, max_sequence_number) - let mut field_id_source: HashMap = HashMap::new(); - for (file_idx, file_meta) in data_files.iter().enumerate() { - let (ref field_ids, _) = file_info[&file_idx]; - for &fid in field_ids { - let entry = field_id_source - .entry(fid) - .or_insert((file_idx, i64::MIN)); - if file_meta.max_sequence_number > entry.1 { - *entry = (file_idx, file_meta.max_sequence_number); - } - } - } - - // For each projected field, determine which file provides it (by field ID). - // file_index -> Vec (target column names) - let mut file_read_columns: HashMap> = HashMap::new(); - for field in &read_type { - if let Some(&(file_idx, _)) = field_id_source.get(&field.id()) { - file_read_columns - .entry(file_idx) - .or_default() - .push(field.name().to_string()); - } - } - - // For each projected field, record (file_index, target_column_name) for assembly. - let column_plan: Vec<(Option, String)> = read_type - .iter() - .map(|field| { - let file_idx = field_id_source.get(&field.id()).map(|&(idx, _)| idx); - (file_idx, field.name().to_string()) - }) - .collect(); - - // Collect which file indices we need to open streams for. - let active_file_indices: Vec = file_read_columns.keys().copied().collect(); - - // Edge case: if no file provides any projected column (e.g. SELECT on a newly added - // column that no file contains yet), we still need to emit NULL-filled rows to - // preserve the correct row count. - if active_file_indices.is_empty() { - let first_row_id = data_files[0].first_row_id.unwrap_or(0); - let file_row_count = data_files[0].row_count; - let total_rows = match &row_ranges { - Some(ranges) => expand_selected_row_ids(first_row_id, file_row_count, ranges).len(), - None => file_row_count as usize, - }; - let mut emitted = 0; - while emitted < total_rows { - let rows_to_emit = (total_rows - emitted).min(output_batch_size); - let columns: Vec> = target_schema - .fields() - .iter() - .map(|f| arrow_array::new_null_array(f.data_type(), rows_to_emit)) - .collect(); - let batch = if columns.is_empty() { - RecordBatch::try_new_with_options( - target_schema.clone(), - columns, - &arrow_array::RecordBatchOptions::new().with_row_count(Some(rows_to_emit)), - ) - } else { - RecordBatch::try_new(target_schema.clone(), columns) - } - .map_err(|e| Error::UnexpectedError { - message: format!("Failed to build NULL-filled RecordBatch: {e}"), - source: Some(Box::new(e)), - })?; - emitted += rows_to_emit; - yield batch; - } - } else { - - // Open a stream for each active file. - // Build per-file read_type: only the DataFields this file is responsible for. - let mut file_streams: HashMap = HashMap::new(); - for &file_idx in &active_file_indices { - let file_cols = file_read_columns.get(&file_idx).cloned().unwrap_or_default(); - let file_read_type: Vec = file_cols - .iter() - .filter_map(|col_name| read_type.iter().find(|f| f.name() == col_name).cloned()) - .collect(); - - let (_, ref data_fields) = file_info[&file_idx]; - - let stream = read_single_file_stream( - file_io.clone(), - SingleFileReadRequest { - split: split.clone(), - file_meta: data_files[file_idx].clone(), - read_type: file_read_type, - table_fields: table_fields.clone(), - data_fields: data_fields.clone(), - predicates: Vec::new(), - batch_size, - dv: None, - row_ranges: row_ranges.clone(), - }, - )?; - file_streams.insert(file_idx, stream); - } - - // Per-file cursor: current batch + offset within it. - let mut file_cursors: HashMap = HashMap::new(); - - loop { - // Ensure each active file has a current batch. If a file's cursor is exhausted - // or not yet initialized, read the next batch from its stream. - for &file_idx in &active_file_indices { - let needs_next = match file_cursors.get(&file_idx) { - None => true, - Some((batch, offset)) => *offset >= batch.num_rows(), - }; - if needs_next { - file_cursors.remove(&file_idx); - if let Some(stream) = file_streams.get_mut(&file_idx) { - if let Some(batch_result) = stream.next().await { - let batch = batch_result?; - if batch.num_rows() > 0 { - file_cursors.insert(file_idx, (batch, 0)); - } - } - } - } - } - - // All active files must have a cursor to assemble a valid row. - // If any file has no cursor (stream exhausted), we're done. - if active_file_indices.iter().any(|idx| !file_cursors.contains_key(idx)) { - break; - } - - // Determine how many rows we can emit: min of remaining rows across all files. - let remaining: usize = active_file_indices - .iter() - .map(|idx| { - let (batch, offset) = file_cursors.get(idx).unwrap(); - batch.num_rows() - offset - }) - .min() - .unwrap_or(0); - - if remaining == 0 { - break; - } - - let rows_to_emit = remaining.min(output_batch_size); - - // Slice each file's current batch and assemble columns. - // Use the target schema so that missing columns are null-filled. - let mut columns: Vec> = - Vec::with_capacity(column_plan.len()); - - for (i, (file_idx_opt, col_name)) in column_plan.iter().enumerate() { - let target_field = &target_schema.fields()[i]; - let col = file_idx_opt - .and_then(|file_idx| file_cursors.get(&file_idx)) - .and_then(|(batch, offset)| { - batch - .schema() - .index_of(col_name) - .ok() - .map(|col_idx| batch.column(col_idx).slice(*offset, rows_to_emit)) - }); - - columns.push(col.unwrap_or_else(|| { - arrow_array::new_null_array(target_field.data_type(), rows_to_emit) - })); - } - - // Advance all cursors. - for &file_idx in &active_file_indices { - if let Some((_, ref mut offset)) = file_cursors.get_mut(&file_idx) { - *offset += rows_to_emit; - } - } - - let merged = RecordBatch::try_new(target_schema.clone(), columns).map_err(|e| Error::UnexpectedError { - message: format!("Failed to build merged RecordBatch: {e}"), - source: Some(Box::new(e)), - })?; - yield merged; - } - } // end else (active_file_indices non-empty) - } - .boxed()) -} - -/// Convert absolute RowRanges to file-local 0-based ranges. -fn to_local_row_ranges( - row_ranges: &[RowRange], - first_row_id: i64, - row_count: i64, -) -> Vec { - let file_end = first_row_id + row_count - 1; - row_ranges - .iter() - .filter_map(|r| { - if r.to() < first_row_id || r.from() > file_end { - return None; - } - let local_from = (r.from() - first_row_id).max(0); - let local_to = (r.to() - first_row_id).min(row_count - 1); - Some(RowRange::new(local_from, local_to)) - }) - .collect() -} - -/// Merge DV and row_ranges into a unified list of 0-based inclusive RowRanges. -/// Returns `None` if no filtering is needed (no DV and no ranges). -/// -/// Complexity: O(D + R) where D = number of deleted rows, R = number of ranges. -fn merge_row_selection( - row_count: i64, - dv: Option<&DeletionVector>, - row_ranges: Option<&[RowRange]>, -) -> Option> { - let has_dv = dv.is_some_and(|d| !d.is_empty()); - let has_ranges = row_ranges.is_some(); - if !has_dv && !has_ranges { - return None; - } - - // Fast path: no DV, just return row_ranges as-is. - if !has_dv { - return row_ranges.map(|r| r.to_vec()); - } - - // Build non-deleted ranges from DV (sorted iterator). - let dv_ranges = dv_to_non_deleted_ranges(dv.unwrap(), row_count); - - match row_ranges { - Some(ranges) => Some(intersect_sorted_ranges(&dv_ranges, ranges)), - None => Some(dv_ranges), - } -} - -/// Convert a DeletionVector into sorted non-deleted inclusive RowRanges. -/// The DV iterator yields sorted deleted positions. -fn dv_to_non_deleted_ranges(dv: &DeletionVector, row_count: i64) -> Vec { - let mut result = Vec::new(); - let mut cursor: i64 = 0; - for deleted in dv.iter() { - let del = deleted as i64; - if del >= row_count { - break; - } - if del > cursor { - result.push(RowRange::new(cursor, del - 1)); - } - cursor = del + 1; - } - if cursor < row_count { - result.push(RowRange::new(cursor, row_count - 1)); - } - result -} - -/// Intersect two sorted lists of inclusive RowRanges using a merge-style scan. -fn intersect_sorted_ranges(a: &[RowRange], b: &[RowRange]) -> Vec { - let mut result = Vec::new(); - let (mut i, mut j) = (0, 0); - while i < a.len() && j < b.len() { - let from = a[i].from().max(b[j].from()); - let to = a[i].to().min(b[j].to()); - if from <= to { - result.push(RowRange::new(from, to)); - } - // Advance the range that ends first. - if a[i].to() < b[j].to() { - i += 1; - } else { - j += 1; - } - } - result -} - -/// Expand row_ranges into a flat sequence of selected row IDs for a file. -fn expand_selected_row_ids(first_row_id: i64, row_count: i64, row_ranges: &[RowRange]) -> Vec { - if row_count == 0 { - return Vec::new(); - } - let file_end = first_row_id + row_count - 1; - let mut ids = Vec::new(); - for r in row_ranges { - let from = r.from().max(first_row_id); - let to = r.to().min(file_end); - for id in from..=to { - ids.push(id); - } - } - ids -} - -fn attach_row_id( - batch: RecordBatch, - row_id_index: usize, - selected_row_ids: &[i64], - row_id_offset: &mut usize, - output_schema: &Arc, -) -> crate::Result { - let num_rows = batch.num_rows(); - let batch_ids = &selected_row_ids[*row_id_offset..*row_id_offset + num_rows]; - *row_id_offset += num_rows; - let array: Arc = Arc::new(Int64Array::from(batch_ids.to_vec())); - insert_column_at(batch, array, row_id_index, output_schema) -} - -fn insert_column_at( - batch: RecordBatch, - column: Arc, - insert_index: usize, - output_schema: &Arc, -) -> crate::Result { - let mut columns: Vec> = Vec::with_capacity(batch.num_columns() + 1); - for (i, col) in batch.columns().iter().enumerate() { - if i == insert_index { - columns.push(column.clone()); - } - columns.push(col.clone()); - } - if insert_index >= batch.num_columns() { - columns.push(column); - } - RecordBatch::try_new(output_schema.clone(), columns).map_err(|e| Error::UnexpectedError { - message: format!("Failed to insert column into RecordBatch: {e}"), - source: Some(Box::new(e)), - }) -} - -/// Append a null `_ROW_ID` column for files without `first_row_id`. -fn append_null_row_id_column( - batch: RecordBatch, - insert_index: usize, - output_schema: &Arc, -) -> crate::Result { - let array: Arc = Arc::new(Int64Array::new_null(batch.num_rows())); - insert_column_at(batch, array, insert_index, output_schema) -} diff --git a/crates/paimon/src/table/data_evolution_reader.rs b/crates/paimon/src/table/data_evolution_reader.rs new file mode 100644 index 00000000..ec06e141 --- /dev/null +++ b/crates/paimon/src/table/data_evolution_reader.rs @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::data_file_reader::{ + append_null_row_id_column, attach_row_id, expand_selected_row_ids, insert_column_at, + DataFileReader, +}; +use crate::arrow::build_target_arrow_schema; +use crate::io::FileIO; +use crate::spec::{DataField, DataFileMeta, ROW_ID_FIELD_NAME}; +use crate::table::schema_manager::SchemaManager; +use crate::table::ArrowRecordBatchStream; +use crate::table::RowRange; +use crate::{DataSplit, Error}; +use arrow_array::{Array, Int64Array, RecordBatch}; + +use async_stream::try_stream; +use futures::StreamExt; +use std::collections::HashMap; +use std::sync::Arc; + +/// Reads data files in data evolution mode, merging columns from files +/// that share the same row ID range. +pub(crate) struct DataEvolutionReader { + file_io: FileIO, + schema_manager: SchemaManager, + table_schema_id: i64, + table_fields: Vec, + /// read_type with _ROW_ID filtered out — used for file reads. + file_read_type: Vec, + /// Position of _ROW_ID in the original read_type, if requested. + row_id_index: Option, + /// Arrow schema for the full output (including _ROW_ID if requested). + output_schema: Arc, +} + +impl DataEvolutionReader { + pub(crate) fn new( + file_io: FileIO, + schema_manager: SchemaManager, + table_schema_id: i64, + table_fields: Vec, + read_type: Vec, + ) -> crate::Result { + let row_id_index = read_type.iter().position(|f| f.name() == ROW_ID_FIELD_NAME); + let file_read_type: Vec = read_type + .iter() + .filter(|f| f.name() != ROW_ID_FIELD_NAME) + .cloned() + .collect(); + let output_schema = build_target_arrow_schema(&read_type)?; + + Ok(Self { + file_io, + schema_manager, + table_schema_id, + table_fields, + file_read_type, + row_id_index, + output_schema, + }) + } + + /// Read data files in data evolution mode. + /// + /// Each DataSplit contains files grouped by `first_row_id`. Files within a split may contain + /// different columns for the same logical rows. This method reads each file and merges them + /// column-wise, respecting `max_sequence_number` for conflict resolution. + pub fn read(self, data_splits: &[DataSplit]) -> crate::Result { + let splits: Vec = data_splits.to_vec(); + + Ok(try_stream! { + let file_reader = DataFileReader::new( + self.file_io.clone(), + self.schema_manager.clone(), + self.table_schema_id, + self.table_fields.clone(), + self.file_read_type.clone(), + Vec::new(), + ); + + for split in splits { + let row_ranges = split.row_ranges().map(|r| r.to_vec()); + + if split.raw_convertible() || split.data_files().len() == 1 { + for file_meta in split.data_files().to_vec() { + let data_fields: Option> = if file_meta.schema_id != self.table_schema_id { + let data_schema = self.schema_manager.schema(file_meta.schema_id).await?; + Some(data_schema.fields().to_vec()) + } else { + None + }; + + let has_row_id = file_meta.first_row_id.is_some(); + let effective_row_ranges = if has_row_id { row_ranges.clone() } else { None }; + + let selected_row_ids = if self.row_id_index.is_some() && has_row_id { + effective_row_ranges.as_ref().map(|ranges| { + expand_selected_row_ids( + file_meta.first_row_id.unwrap(), + file_meta.row_count, + ranges, + ) + }) + } else { + None + }; + let file_base_row_id = file_meta.first_row_id.unwrap_or(0); + let mut row_id_cursor = file_base_row_id; + let mut row_id_offset: usize = 0; + + let mut stream = file_reader.read_single_file_stream( + &split, + file_meta, + data_fields, + None, + effective_row_ranges, + )?; + while let Some(batch) = stream.next().await { + let batch = batch?; + let num_rows = batch.num_rows(); + if let Some(idx) = self.row_id_index { + if !has_row_id { + yield append_null_row_id_column(batch, idx, &self.output_schema)?; + } else if let Some(ref ids) = selected_row_ids { + yield attach_row_id(batch, idx, ids, &mut row_id_offset, &self.output_schema)?; + } else { + let row_ids: Vec = (row_id_cursor..row_id_cursor + num_rows as i64).collect(); + row_id_cursor += num_rows as i64; + let array: Arc = Arc::new(Int64Array::from(row_ids)); + yield insert_column_at(batch, array, idx, &self.output_schema)?; + } + } else { + yield batch; + } + } + } + } else { + let files = split.data_files(); + if !files.iter().all(|f| f.first_row_id.is_some()) { + Err(Error::UnexpectedError { + message: "All files in a field merge split should have first_row_id".to_string(), + source: None, + })?; + } + if !files.iter().all(|f| f.row_count == files[0].row_count) { + Err(Error::UnexpectedError { + message: "All files in a field merge split should have the same row count".to_string(), + source: None, + })?; + } + if !files.iter().all(|f| f.first_row_id == files[0].first_row_id) { + Err(Error::UnexpectedError { + message: "All files in a field merge split should have the same first row id".to_string(), + source: None, + })?; + } + + let group_base_row_id = files[0].first_row_id; + let has_group_row_id = group_base_row_id.is_some(); + let group_row_count = files[0].row_count; + let effective_row_ranges = if has_group_row_id { row_ranges.clone() } else { None }; + + let selected_row_ids = if self.row_id_index.is_some() && has_group_row_id { + effective_row_ranges.as_ref().map(|ranges| { + expand_selected_row_ids( + group_base_row_id.unwrap(), + group_row_count, + ranges, + ) + }) + } else { + None + }; + let mut row_id_cursor = group_base_row_id.unwrap_or(0); + let mut row_id_offset: usize = 0; + + let mut merge_stream = self.merge_files_by_columns( + &split, + effective_row_ranges, + )?; + while let Some(batch) = merge_stream.next().await { + let batch = batch?; + let num_rows = batch.num_rows(); + if let Some(idx) = self.row_id_index { + if !has_group_row_id { + yield append_null_row_id_column(batch, idx, &self.output_schema)?; + } else if let Some(ref ids) = selected_row_ids { + yield attach_row_id(batch, idx, ids, &mut row_id_offset, &self.output_schema)?; + } else { + let row_ids: Vec = (row_id_cursor..row_id_cursor + num_rows as i64).collect(); + row_id_cursor += num_rows as i64; + let array: Arc = Arc::new(Int64Array::from(row_ids)); + yield insert_column_at(batch, array, idx, &self.output_schema)?; + } + } else { + yield batch; + } + } + } + } + } + .boxed()) + } + + /// Merge multiple files column-wise for data evolution, streaming with bounded memory. + /// + /// Uses field IDs (not column names) to resolve which file provides which column, + /// ensuring correctness across schema evolution (column rename, add, drop). + fn merge_files_by_columns( + &self, + split: &DataSplit, + row_ranges: Option>, + ) -> crate::Result { + let data_files = split.data_files(); + if data_files.is_empty() { + return Ok(futures::stream::empty().boxed()); + } + + let file_io = self.file_io.clone(); + let schema_manager = self.schema_manager.clone(); + let table_schema_id = self.table_schema_id; + let split = split.clone(); + let data_files: Vec = data_files.to_vec(); + let read_type = self.file_read_type.clone(); + let table_fields = self.table_fields.clone(); + // Batch size for column-merge output. Matches the default Parquet reader batch size. + const MERGE_BATCH_SIZE: usize = 1024; + let output_batch_size: usize = MERGE_BATCH_SIZE; + let target_schema = build_target_arrow_schema(&read_type)?; + + Ok(try_stream! { + // Pre-load schemas and collect field IDs + data_fields per file. + let mut file_info: HashMap, Option>)> = HashMap::new(); + + for (file_idx, file_meta) in data_files.iter().enumerate() { + let (field_ids, data_fields) = if file_meta.schema_id != table_schema_id { + let file_schema = schema_manager.schema(file_meta.schema_id).await?; + let file_fields = file_schema.fields(); + + let ids: Vec = if let Some(ref wc) = file_meta.write_cols { + wc.iter() + .filter_map(|name| file_fields.iter().find(|f| f.name() == name).map(|f| f.id())) + .collect() + } else { + file_fields.iter().map(|f| f.id()).collect() + }; + + (ids, Some(file_fields.to_vec())) + } else { + let ids: Vec = if let Some(ref wc) = file_meta.write_cols { + wc.iter() + .filter_map(|name| table_fields.iter().find(|f| f.name() == name).map(|f| f.id())) + .collect() + } else { + table_fields.iter().map(|f| f.id()).collect() + }; + + (ids, None) + }; + + file_info.insert(file_idx, (field_ids, data_fields)); + } + + // Determine which file provides each field ID, resolving conflicts by max_sequence_number. + let mut field_id_source: HashMap = HashMap::new(); + for (file_idx, file_meta) in data_files.iter().enumerate() { + let (ref field_ids, _) = file_info[&file_idx]; + for &fid in field_ids { + let entry = field_id_source + .entry(fid) + .or_insert((file_idx, i64::MIN)); + if file_meta.max_sequence_number > entry.1 { + *entry = (file_idx, file_meta.max_sequence_number); + } + } + } + + // For each projected field, determine which file provides it (by field ID). + let mut file_read_columns: HashMap> = HashMap::new(); + for field in &read_type { + if let Some(&(file_idx, _)) = field_id_source.get(&field.id()) { + file_read_columns + .entry(file_idx) + .or_default() + .push(field.name().to_string()); + } + } + + let column_plan: Vec<(Option, String)> = read_type + .iter() + .map(|field| { + let file_idx = field_id_source.get(&field.id()).map(|&(idx, _)| idx); + (file_idx, field.name().to_string()) + }) + .collect(); + + let active_file_indices: Vec = file_read_columns.keys().copied().collect(); + + // Edge case: no file provides any projected column. + if active_file_indices.is_empty() { + let first_row_id = data_files[0].first_row_id.unwrap_or(0); + let file_row_count = data_files[0].row_count; + let total_rows = match &row_ranges { + Some(ranges) => expand_selected_row_ids(first_row_id, file_row_count, ranges).len(), + None => file_row_count as usize, + }; + let mut emitted = 0; + while emitted < total_rows { + let rows_to_emit = (total_rows - emitted).min(output_batch_size); + let columns: Vec> = target_schema + .fields() + .iter() + .map(|f| arrow_array::new_null_array(f.data_type(), rows_to_emit)) + .collect(); + let batch = if columns.is_empty() { + RecordBatch::try_new_with_options( + target_schema.clone(), + columns, + &arrow_array::RecordBatchOptions::new().with_row_count(Some(rows_to_emit)), + ) + } else { + RecordBatch::try_new(target_schema.clone(), columns) + } + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to build NULL-filled RecordBatch: {e}"), + source: Some(Box::new(e)), + })?; + emitted += rows_to_emit; + yield batch; + } + } else { + // Open a stream for each active file via DataFileReader. + let mut file_streams: HashMap = HashMap::new(); + for &file_idx in &active_file_indices { + let file_cols = file_read_columns.get(&file_idx).cloned().unwrap_or_default(); + let file_read_type: Vec = file_cols + .iter() + .filter_map(|col_name| read_type.iter().find(|f| f.name() == col_name).cloned()) + .collect(); + + let (_, ref data_fields) = file_info[&file_idx]; + + let file_reader = DataFileReader::new( + file_io.clone(), + schema_manager.clone(), + table_schema_id, + table_fields.clone(), + file_read_type, + Vec::new(), + ); + let stream = file_reader.read_single_file_stream( + &split, + data_files[file_idx].clone(), + data_fields.clone(), + None, + row_ranges.clone(), + )?; + file_streams.insert(file_idx, stream); + } + + // Per-file cursor: current batch + offset within it. + let mut file_cursors: HashMap = HashMap::new(); + + loop { + for &file_idx in &active_file_indices { + let needs_next = match file_cursors.get(&file_idx) { + None => true, + Some((batch, offset)) => *offset >= batch.num_rows(), + }; + if needs_next { + file_cursors.remove(&file_idx); + if let Some(stream) = file_streams.get_mut(&file_idx) { + if let Some(batch_result) = stream.next().await { + let batch = batch_result?; + if batch.num_rows() > 0 { + file_cursors.insert(file_idx, (batch, 0)); + } + } + } + } + } + + // All files in a merge group have the same row count (validated above), + // so any file stream exhausting means all streams are done. + if active_file_indices.iter().any(|idx| !file_cursors.contains_key(idx)) { + break; + } + + let remaining: usize = active_file_indices + .iter() + .map(|idx| { + let (batch, offset) = file_cursors.get(idx).unwrap(); + batch.num_rows() - offset + }) + .min() + .unwrap_or(0); + + if remaining == 0 { + break; + } + + let rows_to_emit = remaining.min(output_batch_size); + + let mut columns: Vec> = + Vec::with_capacity(column_plan.len()); + + for (i, (file_idx_opt, col_name)) in column_plan.iter().enumerate() { + let target_field = &target_schema.fields()[i]; + let col = file_idx_opt + .and_then(|file_idx| file_cursors.get(&file_idx)) + .and_then(|(batch, offset)| { + batch + .schema() + .index_of(col_name) + .ok() + .map(|col_idx| batch.column(col_idx).slice(*offset, rows_to_emit)) + }); + + columns.push(col.unwrap_or_else(|| { + arrow_array::new_null_array(target_field.data_type(), rows_to_emit) + })); + } + + for &file_idx in &active_file_indices { + if let Some((_, ref mut offset)) = file_cursors.get_mut(&file_idx) { + *offset += rows_to_emit; + } + } + + let merged = RecordBatch::try_new(target_schema.clone(), columns).map_err(|e| Error::UnexpectedError { + message: format!("Failed to build merged RecordBatch: {e}"), + source: Some(Box::new(e)), + })?; + yield merged; + } + } + } + .boxed()) + } +} diff --git a/crates/paimon/src/table/data_file_reader.rs b/crates/paimon/src/table/data_file_reader.rs new file mode 100644 index 00000000..b5f16805 --- /dev/null +++ b/crates/paimon/src/table/data_file_reader.rs @@ -0,0 +1,457 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::build_target_arrow_schema; +use crate::arrow::format::create_format_reader; +use crate::arrow::schema_evolution::{create_index_mapping, NULL_FIELD_INDEX}; +use crate::deletion_vector::{DeletionVector, DeletionVectorFactory}; +use crate::io::FileIO; +use crate::spec::{DataField, DataFileMeta, Predicate}; +use crate::table::schema_manager::SchemaManager; +use crate::table::ArrowRecordBatchStream; +use crate::table::RowRange; +use crate::{DataSplit, Error}; +use arrow_array::{Array, Int64Array, RecordBatch}; +use arrow_cast::cast; + +use async_stream::try_stream; +use futures::StreamExt; +use std::sync::Arc; + +/// Reads data from Parquet files. +#[derive(Clone)] +pub(crate) struct DataFileReader { + file_io: FileIO, + schema_manager: SchemaManager, + table_schema_id: i64, + table_fields: Vec, + read_type: Vec, + predicates: Vec, +} + +impl DataFileReader { + pub(crate) fn new( + file_io: FileIO, + schema_manager: SchemaManager, + table_schema_id: i64, + table_fields: Vec, + read_type: Vec, + predicates: Vec, + ) -> Self { + Self { + file_io, + schema_manager, + table_schema_id, + table_fields, + read_type, + predicates, + } + } + + /// Take a stream of DataSplits and read every data file in each split. + /// Returns a stream of Arrow RecordBatches from all files. + /// + /// Uses SchemaManager to load the data file's schema (via `DataFileMeta.schema_id`) + /// and computes field-ID-based index mapping for schema evolution (added columns, + /// type promotion, column reordering). + /// + /// Matches [RawFileSplitRead.createReader](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java). + pub fn read(self, data_splits: &[DataSplit]) -> crate::Result { + let splits: Vec = data_splits.to_vec(); + let reader = self; + Ok(try_stream! { + for split in splits { + // Create DV factory for this split only. + let dv_factory = if split + .data_deletion_files() + .is_some_and(|files| files.iter().any(Option::is_some)) + { + Some( + DeletionVectorFactory::new( + &reader.file_io, + split.data_files(), + split.data_deletion_files(), + ) + .await?, + ) + } else { + None + }; + + for file_meta in split.data_files().to_vec() { + let dv = dv_factory + .as_ref() + .and_then(|factory| factory.get_deletion_vector(&file_meta.file_name)) + .cloned(); + + // Load data file's schema if it differs from the table schema. + let data_fields: Option> = if file_meta.schema_id != reader.table_schema_id { + let data_schema = reader.schema_manager.schema(file_meta.schema_id).await?; + Some(data_schema.fields().to_vec()) + } else { + None + }; + + let mut stream = reader.read_single_file_stream( + &split, + file_meta, + data_fields, + dv, + None, + )?; + while let Some(batch) = stream.next().await { + yield batch?; + } + } + } + } + .boxed()) + } + + /// Read a single parquet file from a split, returning a lazy stream of batches. + /// Optionally applies a deletion vector. + /// + /// Handles schema evolution using field-ID-based index mapping: + /// - `data_fields`: if `Some`, the fields from the data file's schema (loaded via SchemaManager). + /// Used to compute index mapping between `read_type` and data fields by field ID. + /// - Columns missing from the file are filled with null arrays. + /// - Columns whose Arrow type differs from the target type are cast (type promotion). + /// + /// Reference: [RawFileSplitRead.createFileReader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java) + pub(super) fn read_single_file_stream( + &self, + split: &DataSplit, + file_meta: DataFileMeta, + data_fields: Option>, + dv: Option>, + row_ranges: Option>, + ) -> crate::Result { + let read_type = self.read_type.clone(); + let table_fields = self.table_fields.clone(); + let predicates = self.predicates.clone(); + let file_io = self.file_io.clone(); + let split = split.clone(); + + let target_schema = build_target_arrow_schema(&read_type)?; + let file_fields = data_fields.clone().unwrap_or_else(|| table_fields.clone()); + + // Compute index mapping and determine which columns to read from the file. + let (projected_read_fields, index_mapping) = if let Some(ref df) = data_fields { + let mapping = create_index_mapping(&read_type, df); + match mapping { + Some(ref idx_map) => { + let mut seen = std::collections::HashSet::new(); + let fields_to_read: Vec = idx_map + .iter() + .filter(|&&idx| idx != NULL_FIELD_INDEX && seen.insert(idx)) + .map(|&idx| df[idx as usize].clone()) + .collect(); + (fields_to_read, Some(idx_map.clone())) + } + None => (df.clone(), None), + } + } else { + (read_type.clone(), None) + }; + + // Remap predicates from table-level to file-level indices. + let file_predicates = { + let remapped = crate::arrow::filtering::remap_predicates_to_file( + &predicates, + &table_fields, + &file_fields, + ); + if remapped.is_empty() { + None + } else { + Some(crate::arrow::format::FilePredicates { + predicates: remapped, + file_fields: file_fields.clone(), + }) + } + }; + + Ok(try_stream! { + let path_to_read = split.data_file_path(&file_meta); + let format_reader = create_format_reader(&path_to_read)?; + let input_file = file_io.new_input(&path_to_read)?; + let file_reader = input_file.reader().await?; + let local_ranges = row_ranges.as_ref().map(|ranges| { + to_local_row_ranges(ranges, file_meta.first_row_id.unwrap_or(0), file_meta.row_count) + }); + + let row_selection = merge_row_selection( + file_meta.row_count, + dv.as_deref(), + local_ranges.as_deref(), + ); + + let mut batch_stream = format_reader.read_batch_stream( + Box::new(file_reader), + file_meta.file_size as u64, + &projected_read_fields, + file_predicates.as_ref(), + None, + row_selection, + ).await?; + + while let Some(batch) = batch_stream.next().await { + let batch = batch?; + let num_rows = batch.num_rows(); + let batch_schema = batch.schema(); + + // Build output columns using index mapping (field-ID-based) or by name. + let mut columns: Vec> = Vec::with_capacity(target_schema.fields().len()); + for (i, target_field) in target_schema.fields().iter().enumerate() { + let source_col = if let Some(ref idx_map) = index_mapping { + let data_idx = idx_map[i]; + if data_idx == NULL_FIELD_INDEX { + None + } else { + let data_field = &data_fields.as_ref().unwrap()[data_idx as usize]; + batch_schema + .index_of(data_field.name()) + .ok() + .map(|col_idx| batch.column(col_idx)) + } + } else if let Some(ref df) = data_fields { + batch_schema + .index_of(df[i].name()) + .ok() + .map(|col_idx| batch.column(col_idx)) + } else { + batch_schema + .index_of(target_field.name()) + .ok() + .map(|col_idx| batch.column(col_idx)) + }; + + match source_col { + Some(col) => { + if col.data_type() == target_field.data_type() { + columns.push(col.clone()); + } else { + let casted = cast(col, target_field.data_type()).map_err(|e| { + Error::UnexpectedError { + message: format!( + "Failed to cast column '{}' from {:?} to {:?}: {e}", + target_field.name(), + col.data_type(), + target_field.data_type() + ), + source: Some(Box::new(e)), + } + })?; + columns.push(casted); + } + } + None => { + let null_array = arrow_array::new_null_array(target_field.data_type(), num_rows); + columns.push(null_array); + } + } + } + + let result = if columns.is_empty() { + RecordBatch::try_new_with_options( + target_schema.clone(), + columns, + &arrow_array::RecordBatchOptions::new().with_row_count(Some(num_rows)), + ) + } else { + RecordBatch::try_new(target_schema.clone(), columns) + } + .map_err(|e| { + Error::UnexpectedError { + message: format!("Failed to build schema-evolved RecordBatch: {e}"), + source: Some(Box::new(e)), + } + })?; + yield result; + } + } + .boxed()) + } +} + +/// Convert absolute RowRanges to file-local 0-based ranges. +fn to_local_row_ranges( + row_ranges: &[RowRange], + first_row_id: i64, + row_count: i64, +) -> Vec { + let file_end = first_row_id + row_count - 1; + row_ranges + .iter() + .filter_map(|r| { + if r.to() < first_row_id || r.from() > file_end { + return None; + } + let local_from = (r.from() - first_row_id).max(0); + let local_to = (r.to() - first_row_id).min(row_count - 1); + Some(RowRange::new(local_from, local_to)) + }) + .collect() +} + +/// Merge DV and row_ranges into a unified list of 0-based inclusive RowRanges. +/// Returns `None` if no filtering is needed (no DV and no ranges). +/// +/// Complexity: O(D + R) where D = number of deleted rows, R = number of ranges. +fn merge_row_selection( + row_count: i64, + dv: Option<&DeletionVector>, + row_ranges: Option<&[RowRange]>, +) -> Option> { + let has_dv = dv.is_some_and(|d| !d.is_empty()); + let has_ranges = row_ranges.is_some(); + if !has_dv && !has_ranges { + return None; + } + + if !has_dv { + return row_ranges.map(|r| r.to_vec()); + } + + let dv_ranges = dv_to_non_deleted_ranges(dv.unwrap(), row_count); + + match row_ranges { + Some(ranges) => Some(intersect_sorted_ranges(&dv_ranges, ranges)), + None => Some(dv_ranges), + } +} + +/// Convert a DeletionVector into sorted non-deleted inclusive RowRanges. +fn dv_to_non_deleted_ranges(dv: &DeletionVector, row_count: i64) -> Vec { + let mut result = Vec::new(); + let mut cursor: i64 = 0; + for deleted in dv.iter() { + let del = deleted as i64; + if del >= row_count { + break; + } + if del > cursor { + result.push(RowRange::new(cursor, del - 1)); + } + cursor = del + 1; + } + if cursor < row_count { + result.push(RowRange::new(cursor, row_count - 1)); + } + result +} + +/// Intersect two sorted lists of inclusive RowRanges using a merge-style scan. +fn intersect_sorted_ranges(a: &[RowRange], b: &[RowRange]) -> Vec { + let mut result = Vec::new(); + let (mut i, mut j) = (0, 0); + while i < a.len() && j < b.len() { + let from = a[i].from().max(b[j].from()); + let to = a[i].to().min(b[j].to()); + if from <= to { + result.push(RowRange::new(from, to)); + } + if a[i].to() < b[j].to() { + i += 1; + } else { + j += 1; + } + } + result +} + +/// Expand row_ranges into a flat sequence of selected row IDs for a file. +/// Intended for per-batch _ROW_ID attachment — callers should not pass +/// whole-file ranges with millions of rows, as this allocates a Vec +/// proportional to the selected range size. +pub(super) fn expand_selected_row_ids( + first_row_id: i64, + row_count: i64, + row_ranges: &[RowRange], +) -> Vec { + if row_count == 0 { + return Vec::new(); + } + let file_end = first_row_id + row_count - 1; + let mut ids = Vec::new(); + for r in row_ranges { + let from = r.from().max(first_row_id); + let to = r.to().min(file_end); + for id in from..=to { + ids.push(id); + } + } + ids +} + +pub(super) fn attach_row_id( + batch: RecordBatch, + row_id_index: usize, + selected_row_ids: &[i64], + row_id_offset: &mut usize, + output_schema: &Arc, +) -> crate::Result { + let num_rows = batch.num_rows(); + let end = *row_id_offset + num_rows; + if end > selected_row_ids.len() { + return Err(Error::UnexpectedError { + message: format!( + "Row ID offset out of bounds: need {}..{} but selected_row_ids has {} entries", + *row_id_offset, + end, + selected_row_ids.len() + ), + source: None, + }); + } + let batch_ids = &selected_row_ids[*row_id_offset..end]; + *row_id_offset = end; + let array: Arc = Arc::new(Int64Array::from(batch_ids.to_vec())); + insert_column_at(batch, array, row_id_index, output_schema) +} + +pub(super) fn insert_column_at( + batch: RecordBatch, + column: Arc, + insert_index: usize, + output_schema: &Arc, +) -> crate::Result { + let mut columns: Vec> = Vec::with_capacity(batch.num_columns() + 1); + for (i, col) in batch.columns().iter().enumerate() { + if i == insert_index { + columns.push(column.clone()); + } + columns.push(col.clone()); + } + if insert_index >= batch.num_columns() { + columns.push(column); + } + RecordBatch::try_new(output_schema.clone(), columns).map_err(|e| Error::UnexpectedError { + message: format!("Failed to insert column into RecordBatch: {e}"), + source: Some(Box::new(e)), + }) +} + +/// Append a null `_ROW_ID` column for files without `first_row_id`. +pub(super) fn append_null_row_id_column( + batch: RecordBatch, + insert_index: usize, + output_schema: &Arc, +) -> crate::Result { + let array: Arc = Arc::new(Int64Array::new_null(batch.num_rows())); + insert_column_at(batch, array, insert_index, output_schema) +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 431857f7..2ec5cd96 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -20,7 +20,9 @@ pub(crate) mod bin_pack; mod bucket_filter; mod commit_message; +mod data_evolution_reader; pub mod data_evolution_writer; +mod data_file_reader; mod data_file_writer; #[cfg(feature = "fulltext")] mod full_text_search_builder; @@ -34,6 +36,7 @@ mod snapshot_manager; mod source; mod stats_filter; pub(crate) mod table_commit; +mod table_read; mod table_scan; pub(crate) mod table_write; mod tag_manager; @@ -46,7 +49,7 @@ pub use data_evolution_writer::DataEvolutionWriter; #[cfg(feature = "fulltext")] pub use full_text_search_builder::FullTextSearchBuilder; use futures::stream::BoxStream; -pub use read_builder::{ReadBuilder, TableRead}; +pub use read_builder::ReadBuilder; pub use rest_env::RESTEnv; pub use schema_manager::SchemaManager; pub use snapshot_commit::{RESTSnapshotCommit, RenamingSnapshotCommit, SnapshotCommit}; @@ -55,6 +58,7 @@ pub use source::{ merge_row_ranges, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange, }; pub use table_commit::TableCommit; +pub use table_read::TableRead; pub use table_scan::TableScan; pub use table_write::TableWrite; pub use tag_manager::TagManager; diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 63d9917c..3daa24aa 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -21,13 +21,12 @@ //! and [TypeUtils.project](https://github.com/apache/paimon/blob/master/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java). use super::bucket_filter::{extract_predicate_for_keys, split_partition_and_data_predicates}; -use super::{ArrowRecordBatchStream, Table, TableScan}; +use super::table_read::TableRead; +use super::{Table, TableScan}; use crate::arrow::filtering::reader_pruning_predicates; -use crate::arrow::ArrowReaderBuilder; use crate::spec::{CoreOptions, DataField, Predicate}; use crate::table::source::RowRange; -use crate::Result; -use crate::{DataSplit, Error}; +use crate::{Error, Result}; use std::collections::{HashMap, HashSet}; #[derive(Debug, Clone, Default)] @@ -37,7 +36,10 @@ struct NormalizedFilter { bucket_predicate: Option, } -fn split_scan_predicates(table: &Table, filter: Predicate) -> (Option, Vec) { +pub(super) fn split_scan_predicates( + table: &Table, + filter: Predicate, +) -> (Option, Vec) { let partition_keys = table.schema().partition_keys(); if partition_keys.is_empty() { (None, filter.split_and()) @@ -91,11 +93,6 @@ fn normalize_filter(table: &Table, filter: Predicate) -> NormalizedFilter { } } -fn read_data_predicates(table: &Table, filter: Predicate) -> Vec { - let (_, data_predicates) = split_scan_predicates(table, filter); - reader_pruning_predicates(data_predicates) -} - /// Builder for table scan and table read (new_scan, new_read). /// /// Rust keeps a names-based projection API for ergonomics, while aligning the @@ -262,95 +259,9 @@ impl<'a> ReadBuilder<'a> { } } -/// Table read: reads data from splits (e.g. produced by [TableScan::plan]). -/// -/// Reference: [pypaimon.read.table_read.TableRead](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_read.py) -#[derive(Debug, Clone)] -pub struct TableRead<'a> { - table: &'a Table, - read_type: Vec, - data_predicates: Vec, -} - -impl<'a> TableRead<'a> { - /// Create a new TableRead with a specific read type (projected fields). - pub fn new( - table: &'a Table, - read_type: Vec, - data_predicates: Vec, - ) -> Self { - Self { - table, - read_type, - data_predicates, - } - } - - /// Schema (fields) that this read will produce. - pub fn read_type(&self) -> &[DataField] { - &self.read_type - } - - /// Data predicates for read-side pruning. - pub fn data_predicates(&self) -> &[Predicate] { - &self.data_predicates - } - - /// Table for this read. - pub fn table(&self) -> &Table { - self.table - } - - /// Set a filter predicate for conservative read-side pruning. - /// - /// This is the direct-`TableRead` equivalent of [`ReadBuilder::with_filter`]. - /// Supported non-partition data predicates may be used only on the regular - /// Parquet read path for row-group pruning and native Parquet row - /// filtering. Callers should still keep residual filtering at the query - /// layer for unsupported predicates, non-Parquet files, and data-evolution - /// reads. - pub fn with_filter(mut self, filter: Predicate) -> Self { - self.data_predicates = read_data_predicates(self.table, filter); - self - } - - /// Returns an [`ArrowRecordBatchStream`]. - pub fn to_arrow(&self, data_splits: &[DataSplit]) -> crate::Result { - // todo: consider get read batch size from table - let has_primary_keys = !self.table.schema.primary_keys().is_empty(); - let core_options = CoreOptions::new(self.table.schema.options()); - let deletion_vectors_enabled = core_options.deletion_vectors_enabled(); - let data_evolution = core_options.data_evolution_enabled(); - - if has_primary_keys && !deletion_vectors_enabled { - return Err(Error::Unsupported { - message: format!( - "Reading primary-key tables without deletion vectors is not yet supported. Primary keys: {:?}", - self.table.schema.primary_keys() - ), - }); - } - - let reader = ArrowReaderBuilder::new( - self.table.file_io.clone(), - self.table.schema_manager().clone(), - self.table.schema().id(), - ) - .with_predicates(self.data_predicates.clone()) - .with_table_fields(self.table.schema.fields().to_vec()) - .build(self.read_type().to_vec()); - - if data_evolution { - reader.read_data_evolution(data_splits, self.table.schema.fields()) - } else { - reader.read(data_splits) - } - } -} - #[cfg(test)] mod tests { - use super::TableRead; + use crate::table::TableRead; mod test_utils { include!(concat!(env!("CARGO_MANIFEST_DIR"), "/../test_utils.rs")); } diff --git a/crates/paimon/src/table/table_read.rs b/crates/paimon/src/table/table_read.rs new file mode 100644 index 00000000..cab22f9b --- /dev/null +++ b/crates/paimon/src/table/table_read.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::data_evolution_reader::DataEvolutionReader; +use super::data_file_reader::DataFileReader; +use super::read_builder::split_scan_predicates; +use super::{ArrowRecordBatchStream, Table}; +use crate::arrow::filtering::reader_pruning_predicates; +use crate::spec::{CoreOptions, DataField, Predicate}; +use crate::{DataSplit, Error}; + +/// Table read: reads data from splits (e.g. produced by [TableScan::plan]). +/// +/// Reference: [pypaimon.read.table_read.TableRead](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_read.py) +#[derive(Debug, Clone)] +pub struct TableRead<'a> { + table: &'a Table, + read_type: Vec, + data_predicates: Vec, +} + +impl<'a> TableRead<'a> { + /// Create a new TableRead with a specific read type (projected fields). + pub fn new( + table: &'a Table, + read_type: Vec, + data_predicates: Vec, + ) -> Self { + Self { + table, + read_type, + data_predicates, + } + } + + /// Schema (fields) that this read will produce. + pub fn read_type(&self) -> &[DataField] { + &self.read_type + } + + /// Data predicates for read-side pruning. + pub fn data_predicates(&self) -> &[Predicate] { + &self.data_predicates + } + + /// Table for this read. + pub fn table(&self) -> &Table { + self.table + } + + /// Set a filter predicate for conservative read-side pruning. + pub fn with_filter(mut self, filter: Predicate) -> Self { + let (_, data_predicates) = split_scan_predicates(self.table, filter); + self.data_predicates = reader_pruning_predicates(data_predicates); + self + } + + /// Returns an [`ArrowRecordBatchStream`]. + pub fn to_arrow(&self, data_splits: &[DataSplit]) -> crate::Result { + let has_primary_keys = !self.table.schema.primary_keys().is_empty(); + let core_options = CoreOptions::new(self.table.schema.options()); + let deletion_vectors_enabled = core_options.deletion_vectors_enabled(); + let data_evolution = core_options.data_evolution_enabled(); + + if has_primary_keys && !deletion_vectors_enabled { + return Err(Error::Unsupported { + message: format!( + "Reading primary-key tables without deletion vectors is not yet supported. Primary keys: {:?}", + self.table.schema.primary_keys() + ), + }); + } + + if data_evolution { + // TODO: data evolution mode does not support read-side predicate pruning yet. + let reader = DataEvolutionReader::new( + self.table.file_io.clone(), + self.table.schema_manager().clone(), + self.table.schema().id(), + self.table.schema.fields().to_vec(), + self.read_type().to_vec(), + )?; + reader.read(data_splits) + } else { + let reader = DataFileReader::new( + self.table.file_io.clone(), + self.table.schema_manager().clone(), + self.table.schema().id(), + self.table.schema.fields().to_vec(), + self.read_type().to_vec(), + self.data_predicates.clone(), + ); + reader.read(data_splits) + } + } +} diff --git a/crates/paimon/src/table/write_builder.rs b/crates/paimon/src/table/write_builder.rs index 45db333c..6feda239 100644 --- a/crates/paimon/src/table/write_builder.rs +++ b/crates/paimon/src/table/write_builder.rs @@ -24,7 +24,7 @@ use uuid::Uuid; /// Builder for creating table writers and committers. /// -/// Provides `new_write` (TODO) and `new_commit` methods, with optional +/// Provides `new_write` and `new_commit` methods, with optional /// `overwrite` support for partition-level overwrites. pub struct WriteBuilder<'a> { table: &'a Table,