Skip to content

Parquet reader refactor: move to a Streamly-based streaming pipeline#188

Open
sharmrj wants to merge 29 commits intoDataHaskell:mainfrom
sharmrj:streaming-parquet
Open

Parquet reader refactor: move to a Streamly-based streaming pipeline#188
sharmrj wants to merge 29 commits intoDataHaskell:mainfrom
sharmrj:streaming-parquet

Conversation

@sharmrj
Copy link
Copy Markdown
Contributor

@sharmrj sharmrj commented Mar 20, 2026

Solves #133 and #171.

  • Introduces a RandomAccess class for abstracting Random Access on files (Which can be extended to remote files as well)
  • Uses Pinch to decode thrift metadata
    • There are functions to convert between relevant pinch based types and the types used by the legacy Parquet parser. We do this because we re-use decodePageData from the legacy parser.
  • The core nested loop is now implemented using Stream.unfoldEach; we transpose our RowGroups to get a Stream of ColumnChunks, and we define an Unfold that yields the parsed Column given a ColumnChunk (just the part of the column thats relevant).
  • We allocate the columns ahead of time using newMutableColumn and copy the Columns yielded by the stream into it using copyIntoMutableColumn and then freeze the mutable column. So no growing is necessary

Next steps

  • Implement getting the typeLength when the column has the type FIXED_LEN_BYTE_ARRAY
  • Implement reading with options
    • Take advantage of the fact that we're streaming columns to apply predicates during the parse rather than after like in the current stable parser
  • Reconcile the Thrift types with the types in DataFrame.IO.Parser.Types.
  • We could possible use a ParserCombinator abstraction to deal with binary functions that return (a, ByteString). We might even try using a MonadTransformer stack along with that to clean up the code.
  • Parallelism

Copy link
Copy Markdown
Contributor

@Eiko-Tokura Eiko-Tokura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great work!

readRanges = mapM readBytes
readSuffix :: Int -> m ByteString

newtype ReaderIO r a = ReaderIO {runReaderIO :: r -> IO a}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great work! Would it be better if we can use the mature ReaderT r IO from mtl or transformers instead of rolling our own instances, like using type ReaderIO r = ReaderT r IO? If the intent was to avoid extra dependencies then I think it's fine. (My guess is that we will eventually need StateT for a state accumulating writer anyway)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct that ReaderT is the correct thing to use here (that's really the pattern I was going for to account for multiple ways of reading data). I think we should hold off on adding the dependency until we actually need the monad transformer stack (or a different effect system should we decide that's something we want).


data Range = Range {offset :: !Integer, length :: !Int} deriving (Eq, Show)

class (Monad m) => RandomAccess m where
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(non critical, just a remark) Maybe we can try to merge the abstraction RandomAccess with the DataFrame.IO.Parquet.Seeking into one interface, at later stages.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'll be doing that as I remove the Unstable module and move the code into the current DataFrame.IO Module.

Comment thread src/DataFrame/IO/Unstable/Parquet.hs Outdated
import qualified Data.ByteString as BS
import Data.Functor ((<&>))
import Data.List (foldl', transpose)
import qualified Data.Map as Map
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use Data.Map.Strict by default, there is no need to be lazy here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it turns out the data declaration in DataFrame.Internal.DataFrame uses the Data.Map import. While I agree with you that we should probably make it Data.Map.Strict, I'm afraid that change has a footprint a bit too large for me to feel comfortable throwing it in here. We should follow up on it as its own task.

Copy link
Copy Markdown

@adithyaov adithyaov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job with the PR!

I've made a surface level review.
I've not reviewed the functionality throughly.

Comment thread src/DataFrame/IO/Parquet/Page.hs Outdated
GZIP -> pure (LB.toStrict (GZip.decompress (BS.fromStrict compressed)))
other -> error ("Unsupported compression type: " ++ show other)

readPage :: CompressionCodec -> BS.ByteString -> IO (Maybe Page, BS.ByteString)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related and out of context: This looks like an Unfold to me.

Comment thread src/DataFrame/IO/Parquet/Page.hs Outdated
_ -> False

decompressData :: CompressionCodec -> BS.ByteString -> IO BS.ByteString
decompressData codec compressed = case codec of
Copy link
Copy Markdown

@adithyaov adithyaov Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result of decompressData is used to produce a stream of Page (readPage). This decompression is strict in nature. I'm not sure if we can do a lazy, on-demand, decompression.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decompression happens per page. So in the step function of the columnChunk Unfold, we decompress each page in its entirety before we start processing it. I think there might possibly be some value in the future to reading each page lazily, but I would prefer to keep it strict for now.

Comment thread src/DataFrame/IO/Parquet/Page.hs Outdated
result <- next
drainZstd result BS.empty (chunk : acc)
drainZstd (Zstd.Done final) _ acc =
pure $ BS.concat (reverse (final : acc))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytestring might have something similar to fromListRevN or fromChunksRev. If not, it should be easy to write our own.
We can avoid a list traversal and pre-allocate the resulting array avoiding any unnecessary copies.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this at some depth, and I think the best way to approach this is to not use streaming ZSTD since we know the uncompressed size up front from the PageHeader. So instead I use the function from Codec.Compression.Zstd.Base that directly calls the zstd decompress function :

    (ZSTD _) -> createAndTrim uncompressedSize $ \dstPtr ->
      let (srcFP, offset, compressedSize) = toForeignPtr compressed
      in withForeignPtr srcFP $ \srcPtr -> do
        result <- Zstd.decompress
                    dstPtr
                    uncompressedSize
                    (srcPtr `plusPtr`offset)
                    compressedSize
        case result of
          Left e -> error $ "ZSTD error: " <> e
          Right actualSize -> return actualSize

Wdyt @mchav?

Comment thread src/DataFrame/IO/Utils/RandomAccess.hs Outdated
mmapFileForeignPtr,
)

uncurry_ :: (a -> b -> c -> d) -> (a, b, c) -> d
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can maybe call this uncurry3 or something?
A suffix of _ generally signifies discarded result. There are no rules though :-)

Comment thread src/DataFrame/IO/Utils/RandomAccess.hs Outdated
unsafeToByteString :: VS.Vector Word8 -> ByteString
unsafeToByteString v = PS (castForeignPtr ptr) offset len
where
(ptr, offset, len) = VS.unsafeToForeignPtr v
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause maintenance burden.
The core datatype has changed across different versions of bytestring.
We either have to constrain bytstrring to specific versions or support multiple implementations here using CPP macros.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at fromForeignPtr to avoid this problem. AFAICT they have the same API across 0.10.x to 0.12.x

Comment thread src/DataFrame/IO/Unstable/Parquet.hs Outdated
sizes = map (fromIntegral . BS.index footer) [0 .. 3]
in foldl' (.|.) 0 $ zipWith shiftL sizes [0, 8 .. 24]

parseColumns :: (RandomAccess r, MonadIO r) => FileMetadata -> [Stream r Column]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this: [Stream r ColumnChunk]. That said, I'm not in a position to suggest a better alternative.
Could you help me understand how this fits in the bigger picture?
Each element in this list corresponds to a column?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I think I see where this is used.
You can return a vector directly here.
Vector (Stream Column) is easier to reason with over [Stream Column]
FYI, Data.Vector == Streamly.Data.Array (Boxed & Unboxed)

Comment thread src/DataFrame/IO/Unstable/Parquet.hs Outdated
Comment on lines +85 to +87
case Pinch.decode Pinch.compactProtocol rawMetadata of
Left e -> error $ show e
Right metadata -> return metadata
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. You can use maybe
  2. Use of error will make the control flow harder to reason with and manage later.

Comment thread src/DataFrame/IO/Unstable/Parquet.hs Outdated
Comment thread src/DataFrame/IO/Unstable/Parquet/Thrift.hs Outdated
{- | Build a forest from a flat, depth-first schema list,
consuming elements and returning (tree, remaining).
-}
data SchemaTree = SchemaTree SchemaElement [SchemaTree]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a RoseTree.
Is there already an existing library with performance and representation ironed out?

Copy link
Copy Markdown
Contributor Author

@sharmrj sharmrj Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a RoseTree but it's a bit of a throwaway part of the code. It's a relatively cheap operation even on smaller parquet files, and not an especially convoluted part of the logic, so I would prefer to keep it as is rather than introduce another dependency.

@sharmrj
Copy link
Copy Markdown
Contributor Author

sharmrj commented Apr 19, 2026

The latest changes swap out a lot of the functions that use lists internally for ones that use vectors instead. Latest benchmarks:

benchmarking parquet/mtcars (non-nullable)/old (DataFrame.IO.Parquet)
time                 2.100 ms   (1.980 ms .. 2.188 ms)
                     0.973 R²   (0.957 R² .. 0.982 R²)
mean                 1.572 ms   (1.467 ms .. 1.684 ms)
std dev              392.0 μs   (350.2 μs .. 447.2 μs)
variance introduced by outliers: 95% (severely inflated)

benchmarking parquet/mtcars (non-nullable)/new (DataFrame.IO.Unstable.Parquet)
time                 408.0 μs   (404.2 μs .. 411.1 μs)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 404.9 μs   (402.9 μs .. 406.8 μs)
std dev              5.728 μs   (5.047 μs .. 7.286 μs)

benchmarking parquet/iris (non-nullable)/old (DataFrame.IO.Parquet)
time                 212.2 μs   (210.9 μs .. 213.4 μs)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 211.2 μs   (209.7 μs .. 212.9 μs)
std dev              5.231 μs   (3.613 μs .. 8.091 μs)
variance introduced by outliers: 19% (moderately inflated)

benchmarking parquet/iris (non-nullable)/new (DataFrame.IO.Unstable.Parquet)
time                 182.6 μs   (181.4 μs .. 183.9 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 182.0 μs   (181.4 μs .. 182.9 μs)
std dev              2.568 μs   (2.177 μs .. 3.056 μs)

benchmarking parquet/alltypes_plain (mixed types)/old (DataFrame.IO.Parquet)
time                 162.2 μs   (161.2 μs .. 163.1 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 161.8 μs   (161.1 μs .. 162.5 μs)
std dev              2.401 μs   (1.893 μs .. 3.262 μs)

benchmarking parquet/alltypes_plain (mixed types)/new (DataFrame.IO.Unstable.Parquet)
time                 280.5 μs   (279.1 μs .. 282.4 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 278.8 μs   (277.4 μs .. 280.0 μs)
std dev              3.988 μs   (3.000 μs .. 5.524 μs)

benchmarking parquet/1987 (9.5M)/old (DataFrame.IO.Parquet)
time                 8.997 s    (8.560 s .. 9.247 s)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 9.093 s    (9.029 s .. 9.130 s)
std dev              62.81 ms   (22.92 ms .. 85.48 ms)
variance introduced by outliers: 19% (moderately inflated)

benchmarking parquet/1987 (9.5M)/new (DataFrame.IO.Unstable.Parquet)
time                 3.698 s    (3.256 s .. 4.257 s)
                     0.997 R²   (0.991 R² .. 1.000 R²)
mean                 3.629 s    (3.549 s .. 3.685 s)
std dev              78.80 ms   (35.51 ms .. 104.9 ms)
variance introduced by outliers: 19% (moderately inflated)

@sharmrj
Copy link
Copy Markdown
Contributor Author

sharmrj commented Apr 20, 2026

The latest round of changes introduce a couple more optimizations around avoiding intermediate lists. It results in detectable improvements when datasets are large.


benchmarking parquet/mtcars (non-nullable)/DataFrame.IO.Parquet
time                 431.8 μs   (429.6 μs .. 434.9 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 432.8 μs   (431.5 μs .. 434.7 μs)
std dev              6.283 μs   (5.119 μs .. 7.658 μs)

benchmarking parquet/iris (non-nullable)/DataFrame.IO.Parquet
time                 198.9 μs   (198.0 μs .. 199.8 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 198.0 μs   (196.9 μs .. 198.8 μs)
std dev              2.932 μs   (2.349 μs .. 3.938 μs)

benchmarking parquet/alltypes_plain (mixed types)/DataFrame.IO.Parquet
time                 303.4 μs   (302.0 μs .. 305.1 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 301.5 μs   (299.3 μs .. 303.2 μs)
std dev              6.126 μs   (4.590 μs .. 7.982 μs)
variance introduced by outliers: 13% (moderately inflated)

benchmarking parquet/1987 (9.5M)/DataFrame.IO.Parquet
time                 2.887 s    (2.806 s .. NaN s)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 2.851 s    (2.829 s .. 2.871 s)
std dev              27.05 ms   (18.10 ms .. 32.91 ms)
variance introduced by outliers: 19% (moderately inflated)

We don't have a comparison to the old parser anymore as it was replaced.

@sharmrj sharmrj changed the title Parquet reader refactor: move to a Streamly-based streaming pipeline (bounded memory, clearer structure, optional concurrency) Parquet reader refactor: move to a Streamly-based streaming pipeline Apr 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants