Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c49fd56
Use Pinch to decode parquet metadata
Mar 4, 2026
faef937
WIP Implement Parquet reading using streamly
sharmrj Mar 8, 2026
2f95aa8
WIP: PArquet Refactor
sharmrj Mar 15, 2026
8dfea3c
Refactored the streaming parquet parser to return a stream of Columns…
sharmrj Mar 16, 2026
14f0399
Implemented a streaming parquet parser
sharmrj Mar 20, 2026
b29814a
copied over the tests for the parquet parser to test the unstable parser
sharmrj Mar 20, 2026
e0e5a70
Updated the pinch dependency constraints
sharmrj Mar 20, 2026
e0f25c9
Ran fourmolu on the changed files
sharmrj Mar 20, 2026
da0ecc1
ran fourmolu on `DataFrame.IO.Unstable.Parquet.Utils
sharmrj Mar 20, 2026
622a261
Ran fourmolu on the new test file
sharmrj Mar 20, 2026
4c2e2ce
Fixed some hlint issues
sharmrj Mar 20, 2026
6abbe5c
Fixed an issue where the parquet parser was using ~2x the amount of m…
sharmrj Apr 4, 2026
ba5ff6a
Changed Parquet Zstd decompression to no longer stream
sharmrj Apr 4, 2026
61aa7d3
Use `FileBufferedOrSeekable` for the `RandomAccess` instance for `Loc…
sharmrj Apr 4, 2026
461769f
WIP: Streaming Parquet Reader
sharmrj Apr 19, 2026
f0e3f9e
Merge remote-tracking branch 'refs/remotes/upstream/main' into stream…
sharmrj Apr 19, 2026
0206cfe
WIP: Streaming Parquet Implementation
sharmrj Apr 19, 2026
9361f5a
Cleaned up RandomAccess.hs
sharmrj Apr 19, 2026
f349ef1
Implemented the remainder of the parquet parser; replaced functions t…
sharmrj Apr 19, 2026
fe60a50
Formatting
sharmrj Apr 19, 2026
5095e68
Removed an unused pragma
sharmrj Apr 19, 2026
bdc2219
Removed shadowed variable names; removed unused imports; added the La…
sharmrj Apr 19, 2026
1b21195
fourmolu
sharmrj Apr 19, 2026
e01ffc1
Fixed some compiler warnings
sharmrj Apr 19, 2026
3b47a88
Move Unstable module to the main parquet folder; Remove Unstable Module
sharmrj Apr 20, 2026
d4759b5
Fixed hlint errors
sharmrj Apr 20, 2026
b700ec6
Updated examples.cabal with the new parquet IO files
sharmrj Apr 20, 2026
1f0fe12
Removed a duplicate module in examples.cabal
sharmrj Apr 20, 2026
61c7500
Add `pinch` to the `build-depends` list in `examples.cabal`
sharmrj Apr 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions dataframe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,18 @@ library
DataFrame.Display.Terminal.Plot,
DataFrame.IO.CSV,
DataFrame.IO.JSON,
DataFrame.IO.Utils.RandomAccess,
DataFrame.IO.Parquet,
DataFrame.IO.Parquet.Binary,
DataFrame.IO.Parquet.Dictionary,
DataFrame.IO.Parquet.Levels,
DataFrame.IO.Parquet.Thrift,
DataFrame.IO.Parquet.ColumnStatistics,
DataFrame.IO.Parquet.Compression,
DataFrame.IO.Parquet.Decompress,
DataFrame.IO.Parquet.Encoding,
DataFrame.IO.Parquet.Page,
DataFrame.IO.Parquet.Utils,
DataFrame.IO.Parquet.Seeking,
DataFrame.IO.Parquet.Time,
DataFrame.IO.Parquet.Types,
DataFrame.Lazy.IO.CSV,
DataFrame.Lazy.IO.Binary,
DataFrame.Lazy.Internal.DataFrame,
Expand Down Expand Up @@ -141,9 +141,11 @@ library
stm >= 2.5 && < 3,
filepath >= 1.4 && < 2,
Glob >= 0.10 && < 1,
http-conduit >= 2.3 && < 3,
http-conduit >= 2.3 && < 3,
pinch >= 0.5.1.0 && <= 0.5.2.0 ,
streamly-core >= 0.2.3 && < 0.4,
streamly-bytestring >= 0.2.0 && < 0.4

hs-source-dirs: src
default-language: Haskell2010

Expand Down
13 changes: 7 additions & 6 deletions examples/examples.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,16 @@ executable examples
DataFrame.IO.JSON,
DataFrame.IO.Parquet,
DataFrame.IO.Parquet.Binary,
DataFrame.IO.Parquet.Decompress,
DataFrame.IO.Parquet.Dictionary,
DataFrame.IO.Parquet.Levels,
DataFrame.IO.Parquet.Thrift,
DataFrame.IO.Parquet.ColumnStatistics,
DataFrame.IO.Parquet.Compression,
DataFrame.IO.Parquet.Encoding,
DataFrame.IO.Parquet.Levels,
DataFrame.IO.Parquet.Page,
DataFrame.IO.Parquet.Seeking,
DataFrame.IO.Parquet.Thrift,
DataFrame.IO.Parquet.Time,
DataFrame.IO.Parquet.Types,
DataFrame.IO.Parquet.Utils,
DataFrame.IO.Utils.RandomAccess,
DataFrame.Lazy.IO.CSV,
DataFrame.Lazy.IO.Binary,
DataFrame.Lazy.Internal.DataFrame,
Expand All @@ -79,7 +80,6 @@ executable examples
DataFrame.Lazy.Internal.Executor,
DataFrame.Monad,
DataFrame.Hasktorch,
DataFrame.IO.Parquet.Seeking,
DataFrame.Internal.Binary,
DataFrame.Internal.Nullable,
DataFrame.Operators,
Expand Down Expand Up @@ -133,6 +133,7 @@ executable examples
stm >= 2.5 && < 3,
filepath >= 1.4 && < 2,
Glob >= 0.10 && < 1,
pinch >= 0.5.1.0 && <= 0.5.2.0,
if impl(ghc >= 9.12)
build-depends: ghc-typelits-natnormalise == 0.9.3
else
Expand Down
119 changes: 73 additions & 46 deletions src/DataFrame/Functions.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
Expand Down Expand Up @@ -42,11 +43,10 @@ import qualified Data.Text as T
import Data.Time
import qualified Data.Vector as V
import qualified Data.Vector.Unboxed as VU
import Data.Word
import qualified DataFrame.IO.CSV as CSV
import qualified DataFrame.IO.Parquet as Parquet
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types (columnNullCount)

import DataFrame.Internal.Nullable (
BaseType,
NullLift1Op (applyNull1),
Expand All @@ -55,7 +55,6 @@ import DataFrame.Internal.Nullable (
NullLift2Result,
)
import DataFrame.Operators
import Debug.Trace (trace)
import Language.Haskell.TH
import qualified Language.Haskell.TH.Syntax as TH
import System.Directory (doesDirectoryExist)
Expand All @@ -71,7 +70,10 @@ lift f =

lift2 ::
(Columnable c, Columnable b, Columnable a) =>
(c -> b -> a) -> Expr c -> Expr b -> Expr a
(c -> b -> a) ->
Expr c ->
Expr b ->
Expr a
lift2 f =
Binary
( MkBinaryOp
Expand Down Expand Up @@ -161,7 +163,9 @@ unsafeCast colName =

castExpr ::
forall b src.
(Columnable b, Columnable src, Read b) => Expr src -> Expr (Maybe b)
(Columnable b, Columnable src, Read b) =>
Expr src ->
Expr (Maybe b)
castExpr = CastExprWith @b @(Maybe b) @src "castExpr" (either (const Nothing) Just)

castExprWithDefault ::
Expand All @@ -173,7 +177,9 @@ castExprWithDefault def =

castExprEither ::
forall b src.
(Columnable b, Columnable src, Read b) => Expr src -> Expr (Either T.Text b)
(Columnable b, Columnable src, Read b) =>
Expr src ->
Expr (Either T.Text b)
castExprEither =
CastExprWith @b @(Either T.Text b) @src
"castExprEither"
Expand Down Expand Up @@ -454,7 +460,11 @@ max = lift2Decorated Prelude.max "max" Nothing True 1

reduce ::
forall a b.
(Columnable a, Columnable b) => Expr b -> a -> (a -> b -> a) -> Expr a
(Columnable a, Columnable b) =>
Expr b ->
a ->
(a -> b -> a) ->
Expr a
reduce expr start f = Agg (FoldAgg "foldUdf" (Just start) f) expr
{-# INLINEABLE reduce #-}

Expand Down Expand Up @@ -492,21 +502,29 @@ fromJust = liftDecorated Maybe.fromJust "fromJust" Nothing

whenPresent ::
forall a b.
(Columnable a, Columnable b) => (a -> b) -> Expr (Maybe a) -> Expr (Maybe b)
(Columnable a, Columnable b) =>
(a -> b) ->
Expr (Maybe a) ->
Expr (Maybe b)
whenPresent f = liftDecorated (fmap f) "whenPresent" Nothing
{-# INLINEABLE whenPresent #-}

whenBothPresent ::
forall a b c.
(Columnable a, Columnable b, Columnable c) =>
(a -> b -> c) -> Expr (Maybe a) -> Expr (Maybe b) -> Expr (Maybe c)
(a -> b -> c) ->
Expr (Maybe a) ->
Expr (Maybe b) ->
Expr (Maybe c)
whenBothPresent f = lift2Decorated (\l r -> f <$> l <*> r) "whenBothPresent" Nothing False 0
{-# INLINEABLE whenBothPresent #-}

recode ::
forall a b.
(Columnable a, Columnable b, Show (a, b)) =>
[(a, b)] -> Expr a -> Expr (Maybe b)
[(a, b)] ->
Expr a ->
Expr (Maybe b)
recode mapping =
Unary
( MkUnaryOp
Expand All @@ -519,13 +537,20 @@ recode mapping =
recodeWithCondition ::
forall a b.
(Columnable a, Columnable b) =>
Expr b -> [(Expr a -> Expr Bool, b)] -> Expr a -> Expr b
Expr b ->
[(Expr a -> Expr Bool, b)] ->
Expr a ->
Expr b
recodeWithCondition fallback [] _val = fallback
recodeWithCondition fallback ((cond, val) : rest) expr = ifThenElse (cond expr) (lit val) (recodeWithCondition fallback rest expr)

recodeWithDefault ::
forall a b.
(Columnable a, Columnable b, Show (a, b)) => b -> [(a, b)] -> Expr a -> Expr b
(Columnable a, Columnable b, Show (a, b)) =>
b ->
[(a, b)] ->
Expr a ->
Expr b
recodeWithDefault d mapping =
Unary
( MkUnaryOp
Expand Down Expand Up @@ -579,7 +604,9 @@ daysBetween =
bind ::
forall a b m.
(Columnable a, Columnable (m a), Monad m, Columnable b, Columnable (m b)) =>
(a -> m b) -> Expr (m a) -> Expr (m b)
(a -> m b) ->
Expr (m a) ->
Expr (m b)
bind f = liftDecorated (>>= f) "bind" Nothing

{- | Window function: evaluate an expression partitioned by the given columns.
Expand Down Expand Up @@ -712,65 +739,67 @@ declareColumnsFromParquetFile path = do
let pat = if isDir then path </> "*.parquet" else path
matches <- liftIO $ glob pat
files <- liftIO $ filterM (fmap Prelude.not . doesDirectoryExist) matches
metas <- liftIO $ mapM (fmap fst . Parquet.readMetadataFromPath) files
metas <- liftIO $ mapM Parquet.readMetadataFromPath files
let nullableCols :: S.Set T.Text
nullableCols =
S.fromList
[ T.pack (last colPath)
| meta <- metas
, rg <- rowGroups meta
, cc <- rowGroupColumns rg
, let cm = columnMetaData cc
colPath = columnPathInSchema cm
, rg <- unField (row_groups meta)
, cc <- unField (rg_columns rg)
, Just cm <- [unField (cc_meta_data cc)]
, let colPath = map T.unpack (unField (cmd_path_in_schema cm))
, Prelude.not (null colPath)
, columnNullCount (columnStatistics cm) > 0
, let nc :: Int64
nc = case unField (cmd_statistics cm) of
Nothing -> 0
Just stats -> Maybe.fromMaybe 0 (unField $ stats_null_count stats)
, nc > 0
]
let df =
foldl
(\acc meta -> acc <> schemaToEmptyDataFrame nullableCols (schema meta))
(\acc meta -> acc <> schemaToEmptyDataFrame nullableCols (unField (schema meta)))
DataFrame.Internal.DataFrame.empty
metas
declareColumns df

schemaToEmptyDataFrame :: S.Set T.Text -> [SchemaElement] -> DataFrame
schemaToEmptyDataFrame nullableCols elems =
let leafElems = filter (\e -> numChildren e == 0) elems
let leafElems = filter (\e -> Maybe.fromMaybe 0 (unField e.num_children) == 0) elems
in fromNamedColumns (map (schemaElemToColumn nullableCols) leafElems)

schemaElemToColumn :: S.Set T.Text -> SchemaElement -> (T.Text, Column)
schemaElemToColumn nullableCols element =
let colName = elementName element
let colName = unField element.name
isNull = colName `S.member` nullableCols
column =
if isNull
then emptyNullableColumnForType (elementType element)
else emptyColumnForType (elementType element)
then emptyNullableColumnForType (unField element.schematype)
else emptyColumnForType (unField element.schematype)
in (colName, column)

emptyColumnForType :: TType -> Column
emptyColumnForType :: Maybe ThriftType -> Column
emptyColumnForType = \case
BOOL -> fromList @Bool []
BYTE -> fromList @Word8 []
I16 -> fromList @Int16 []
I32 -> fromList @Int32 []
I64 -> fromList @Int64 []
I96 -> fromList @Int64 []
FLOAT -> fromList @Float []
DOUBLE -> fromList @Double []
STRING -> fromList @T.Text []
Just (BOOLEAN _) -> fromList @Bool []
Just (INT32 _) -> fromList @Int32 []
Just (INT64 _) -> fromList @Int64 []
Just (INT96 _) -> fromList @Int64 []
Just (FLOAT _) -> fromList @Float []
Just (DOUBLE _) -> fromList @Double []
Just (BYTE_ARRAY _) -> fromList @T.Text []
Just (FIXED_LEN_BYTE_ARRAY _) -> fromList @T.Text []
other -> error $ "Unsupported parquet type for column: " <> show other

emptyNullableColumnForType :: TType -> Column
emptyNullableColumnForType :: Maybe ThriftType -> Column
emptyNullableColumnForType = \case
BOOL -> fromList @(Maybe Bool) []
BYTE -> fromList @(Maybe Word8) []
I16 -> fromList @(Maybe Int16) []
I32 -> fromList @(Maybe Int32) []
I64 -> fromList @(Maybe Int64) []
I96 -> fromList @(Maybe Int64) []
FLOAT -> fromList @(Maybe Float) []
DOUBLE -> fromList @(Maybe Double) []
STRING -> fromList @(Maybe T.Text) []
Just (BOOLEAN _) -> fromList @(Maybe Bool) []
Just (INT32 _) -> fromList @(Maybe Int32) []
Just (INT64 _) -> fromList @(Maybe Int64) []
Just (INT96 _) -> fromList @(Maybe Int64) []
Just (FLOAT _) -> fromList @(Maybe Float) []
Just (DOUBLE _) -> fromList @(Maybe Double) []
Just (BYTE_ARRAY _) -> fromList @(Maybe T.Text) []
Just (FIXED_LEN_BYTE_ARRAY _) -> fromList @(Maybe T.Text) []
other -> error $ "Unsupported parquet type for column: " <> show other

declareColumnsFromCsvWithOpts :: CSV.ReadOptions -> String -> DecsQ
Expand Down Expand Up @@ -798,8 +827,6 @@ declareColumnsWithPrefix' prefix df =
in
fmap concat $ forM specs $ \(raw, nm, tyStr) -> do
ty <- typeFromString (words tyStr)
let tyDisplay = if ' ' `elem` tyStr then "(" <> T.pack tyStr <> ")" else T.pack tyStr
trace (T.unpack (nm <> " :: Expr " <> tyDisplay)) pure ()
let n = mkName (T.unpack nm)
sig <- sigD n [t|Expr $(pure ty)|]
val <- valD (varP n) (normalB [|col $(TH.lift raw)|]) []
Expand Down
Loading
Loading