Parquet reader refactor: move to a Streamly-based streaming pipeline (bounded memory, clearer structure, optional concurrency)#188
Conversation
… (Each column in a stream is a chunk in the larger column)
Eiko-Tokura
left a comment
There was a problem hiding this comment.
Thanks for the great work!
| readRanges = mapM readBytes | ||
| readSuffix :: Int -> m ByteString | ||
|
|
||
| newtype ReaderIO r a = ReaderIO {runReaderIO :: r -> IO a} |
There was a problem hiding this comment.
Thanks for the great work! Would it be better if we can use the mature ReaderT r IO from mtl or transformers instead of rolling our own instances, like using type ReaderIO r = ReaderT r IO? If the intent was to avoid extra dependencies then I think it's fine. (My guess is that we will eventually need StateT for a state accumulating writer anyway)
There was a problem hiding this comment.
You are correct that ReaderT is the correct thing to use here (that's really the pattern I was going for to account for multiple ways of reading data). I think we should hold off on adding the dependency until we actually need the monad transformer stack (or a different effect system should we decide that's something we want).
|
|
||
| data Range = Range {offset :: !Integer, length :: !Int} deriving (Eq, Show) | ||
|
|
||
| class (Monad m) => RandomAccess m where |
There was a problem hiding this comment.
(non critical, just a remark) Maybe we can try to merge the abstraction RandomAccess with the DataFrame.IO.Parquet.Seeking into one interface, at later stages.
There was a problem hiding this comment.
Yes. I'll be doing that as I remove the Unstable module and move the code into the current DataFrame.IO Module.
| import qualified Data.ByteString as BS | ||
| import Data.Functor ((<&>)) | ||
| import Data.List (foldl', transpose) | ||
| import qualified Data.Map as Map |
There was a problem hiding this comment.
I think we can use Data.Map.Strict by default, there is no need to be lazy here
| GZIP -> pure (LB.toStrict (GZip.decompress (BS.fromStrict compressed))) | ||
| other -> error ("Unsupported compression type: " ++ show other) | ||
|
|
||
| readPage :: CompressionCodec -> BS.ByteString -> IO (Maybe Page, BS.ByteString) |
There was a problem hiding this comment.
Not related and out of context: This looks like an Unfold to me.
| _ -> False | ||
|
|
||
| decompressData :: CompressionCodec -> BS.ByteString -> IO BS.ByteString | ||
| decompressData codec compressed = case codec of |
There was a problem hiding this comment.
The result of decompressData is used to produce a stream of Page (readPage). This decompression is strict in nature. I'm not sure if we can do a lazy, on-demand, decompression.
| result <- next | ||
| drainZstd result BS.empty (chunk : acc) | ||
| drainZstd (Zstd.Done final) _ acc = | ||
| pure $ BS.concat (reverse (final : acc)) |
There was a problem hiding this comment.
bytestring might have something similar to fromListRevN or fromChunksRev. If not, it should be easy to write our own.
We can avoid a list traversal and pre-allocate the resulting array avoiding any unnecessary copies.
| mmapFileForeignPtr, | ||
| ) | ||
|
|
||
| uncurry_ :: (a -> b -> c -> d) -> (a, b, c) -> d |
There was a problem hiding this comment.
You can maybe call this uncurry3 or something?
A suffix of _ generally signifies discarded result. There are no rules though :-)
| unsafeToByteString :: VS.Vector Word8 -> ByteString | ||
| unsafeToByteString v = PS (castForeignPtr ptr) offset len | ||
| where | ||
| (ptr, offset, len) = VS.unsafeToForeignPtr v |
There was a problem hiding this comment.
This will cause maintenance burden.
The core datatype has changed across different versions of bytestring.
We either have to constrain bytstrring to specific versions or support multiple implementations here using CPP macros.
There was a problem hiding this comment.
| sizes = map (fromIntegral . BS.index footer) [0 .. 3] | ||
| in foldl' (.|.) 0 $ zipWith shiftL sizes [0, 8 .. 24] | ||
|
|
||
| parseColumns :: (RandomAccess r, MonadIO r) => FileMetadata -> [Stream r Column] |
There was a problem hiding this comment.
I don't like this: [Stream r ColumnChunk]. That said, I'm not in a position to suggest a better alternative.
Could you help me understand how this fits in the bigger picture?
Each element in this list corresponds to a column?
There was a problem hiding this comment.
Update: I think I see where this is used.
You can return a vector directly here.
Vector (Stream Column) is easier to reason with over [Stream Column]
FYI, Data.Vector == Streamly.Data.Array (Boxed & Unboxed)
| case Pinch.decode Pinch.compactProtocol rawMetadata of | ||
| Left e -> error $ show e | ||
| Right metadata -> return metadata |
There was a problem hiding this comment.
- You can use
maybe - Use of
errorwill make the control flow harder to reason with and manage later.
| readParquetUnstable filepath = IO.withFile filepath IO.ReadMode $ \handle -> do | ||
| runReaderIO parseParquet handle | ||
|
|
||
| parseParquet :: (RandomAccess r, MonadIO r) => r DataFrame |
| @@ -0,0 +1,672 @@ | |||
| {-# LANGUAGE DataKinds #-} | |||
There was a problem hiding this comment.
I've not reviewed this module. It mostly looks like necessary boilerplate.
| {- | Build a forest from a flat, depth-first schema list, | ||
| consuming elements and returning (tree, remaining). | ||
| -} | ||
| data SchemaTree = SchemaTree SchemaElement [SchemaTree] |
There was a problem hiding this comment.
This looks like a RoseTree.
Is there already an existing library with performance and representation ironed out?
Solves #133 and #171.
RandomAccessclass for abstracting Random Access on files (Which can be extended to remote files as well)decodePageDatafrom the legacy parser.Stream.unfoldEach; we transpose ourRowGroupsto get aStreamofColumnChunks, and we define anUnfoldthat yields the parsedColumngiven aColumnChunk(just the part of the column thats relevant).newMutableColumnand copy theColumns yielded by the stream into it usingcopyIntoMutableColumnand then freeze the mutable column. So nogrowing is necessaryNext steps
FIXED_LEN_BYTE_ARRAYDataFrame.IO.Parser.Types.(a, ByteString). We might even try using a MonadTransformer stack along with that to clean up the code.