sparrow-ipc 0.3.0
Loading...
Searching...
No Matches
stream_file_serializer.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <cstddef>
4#include <optional>
5#include <vector>
6
7#include <sparrow/record_batch.hpp>
8
18
19namespace sparrow_ipc
20{
27 {
28 int64_t offset;
30 int64_t body_length;
31 };
32
42 const sparrow::record_batch& record_batch,
43 const std::vector<record_batch_block>& dictionary_blocks,
44 const std::vector<record_batch_block>& record_batch_blocks,
45 any_output_stream& stream
46 );
47
69 [[nodiscard]] SPARROW_IPC_API std::vector<sparrow::record_batch>
70 deserialize_file(std::span<const uint8_t> data);
71
95 {
96 public:
97
106 template <writable_stream TStream>
107 stream_file_serializer(TStream& stream, std::optional<CompressionType> compression = std::nullopt)
108 : m_stream(stream)
109 , m_compression(compression)
110 {
111 }
112
121
129 void write(const sparrow::record_batch& rb);
130
150 template <std::ranges::input_range R>
151 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
152 void write(const R& record_batches)
153 {
154 CompressionCache compressed_buffers_cache;
155 if (std::ranges::empty(record_batches))
156 {
157 return;
158 }
159
160 if (m_ended)
161 {
162 throw std::runtime_error("Cannot write to a file serializer that has been ended");
163 }
164
165 // Write file header magic on first write
166 if (!m_header_written)
167 {
169 m_stream.add_padding();
170 m_header_written = true;
171 }
172
173 // NOTE `reserve_function` is making us store a cache for the compressed buffers at this level.
174 // The benefit of capacity allocation should be evaluated vs storing a cache of compressed buffers
175 // of record batches.
176 const auto reserve_function = [&record_batches, &compressed_buffers_cache, this]()
177 {
179 record_batches,
180 m_stream.size(),
184 compressed_buffers_cache
185 );
186 };
187
188 m_stream.reserve(reserve_function);
189
191 {
192 m_schema_received = true;
193 m_first_record_batch = *record_batches.begin();
194 m_dtypes = get_column_dtypes(*record_batches.begin());
195 serialize_schema_message(*record_batches.begin(), m_stream);
196 }
197
198 for (const auto& rb : record_batches)
199 {
200 if (get_column_dtypes(rb) != m_dtypes)
201 {
202 throw std::invalid_argument("Record batch schema does not match file serializer schema");
203 }
204
206 {
207 if (m_dict_tracker.is_emitted(dict_info.id) && !dict_info.is_delta)
208 {
209 throw std::runtime_error(
210 "Arrow file format does not support multiple non-delta dictionary batches "
211 "for the same dictionary id"
212 );
213 }
214
215 const int64_t dict_offset = static_cast<int64_t>(m_stream.size());
216 const auto dict_block_info = serialize_dictionary_batch(
217 dict_info.id,
218 dict_info.data,
219 dict_info.is_delta,
220 m_stream,
222 compressed_buffers_cache
223 );
224 m_dictionary_blocks.emplace_back(
225 dict_offset,
226 dict_block_info.metadata_length,
227 dict_block_info.body_length
228 );
229 });
230
231 // Offset is from the start of the file to the record batch message
232 const int64_t offset = static_cast<int64_t>(m_stream.size());
233
234 // Serialize and get block info
235 const auto info = serialize_record_batch(rb, m_stream, m_compression, compressed_buffers_cache);
236
237 m_record_batch_blocks.emplace_back(offset, info.metadata_length, info.body_length);
238 }
239 }
240
257 stream_file_serializer& operator<<(const sparrow::record_batch& rb)
258 {
259 write(rb);
260 return *this;
261 }
262
281 template <std::ranges::input_range R>
282 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
283 stream_file_serializer& operator<<(const R& record_batches)
284 {
285 write(record_batches);
286 return *this;
287 }
288
304 {
305 return manip(*this);
306 }
307
324 void end();
325
326 bool m_header_written{false};
327 bool m_schema_received{false};
328 std::optional<sparrow::record_batch> m_first_record_batch;
329 std::vector<sparrow::data_type> m_dtypes;
331 bool m_ended{false};
332 std::optional<CompressionType> m_compression;
334 std::vector<record_batch_block> m_dictionary_blocks;
335 std::vector<record_batch_block> m_record_batch_blocks;
336 };
337
356}
Type-erased wrapper for any stream-like object.
Tracks dictionaries during serialization.
A class for serializing Apache Arrow record batches to an output stream.
void end()
Finalizes the serialization process by writing end-of-stream marker.
A class for serializing Apache Arrow record batches to the IPC file format.
std::vector< record_batch_block > m_record_batch_blocks
stream_file_serializer & operator<<(stream_file_serializer &(*manip)(stream_file_serializer &))
std::vector< sparrow::data_type > m_dtypes
std::optional< CompressionType > m_compression
void write(const R &record_batches)
Writes a collection of record batches to the file.
std::optional< sparrow::record_batch > m_first_record_batch
~stream_file_serializer()
Destructor for the stream_file_serializer.
stream_file_serializer & operator<<(const sparrow::record_batch &rb)
void write(const sparrow::record_batch &rb)
Writes a single record batch to the file.
stream_file_serializer(TStream &stream, std::optional< CompressionType > compression=std::nullopt)
Constructs a stream_file_serializer object with a reference to a stream.
void end()
Finalizes the file serialization by writing footer and trailing magic bytes.
stream_file_serializer & operator<<(const R &record_batches)
std::vector< record_batch_block > m_dictionary_blocks
#define SPARROW_IPC_API
Definition config.hpp:12
std::size_t calculate_serializer_reserve_size(const R &record_batches, std::size_t current_stream_size, bool schema_received, std::optional< CompressionType > compression, const dictionary_tracker &dict_tracker, std::optional< std::reference_wrapper< CompressionCache > > cache=std::nullopt)
void for_each_pending_dictionary(const sparrow::record_batch &record_batch, dictionary_tracker &tracker, Func visitor)
SPARROW_IPC_API serialized_record_batch_info serialize_record_batch(const sparrow::record_batch &record_batch, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a record batch into a binary format following the Arrow IPC specification.
SPARROW_IPC_API size_t write_footer(const sparrow::record_batch &record_batch, const std::vector< record_batch_block > &dictionary_blocks, const std::vector< record_batch_block > &record_batch_blocks, any_output_stream &stream)
Writes the Arrow IPC file footer.
SPARROW_IPC_API void serialize_schema_message(const sparrow::record_batch &record_batch, any_output_stream &stream)
Serializes a schema message for a record batch into a byte buffer.
SPARROW_IPC_API serialized_record_batch_info serialize_dictionary_batch(int64_t dictionary_id, const sparrow::record_batch &record_batch, bool is_delta, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a dictionary batch into a binary format following the Arrow IPC specification.
SPARROW_IPC_API std::vector< sparrow::record_batch > deserialize_file(std::span< const uint8_t > data)
Deserializes Arrow IPC file format into a vector of record batches.
constexpr std::array< std::uint8_t, 8 > arrow_file_header_magic
Magic bytes with padding for file header (8 bytes total for alignment)
stream_file_serializer & end_file(stream_file_serializer &serializer)
SPARROW_IPC_API std::vector< sparrow::data_type > get_column_dtypes(const sparrow::record_batch &rb)
Information about a dictionary used for encoding.
bool is_delta
Whether this is a delta update.
sparrow::record_batch data
Dictionary values as a single-column record batch.
int64_t id
Dictionary identifier.
Represents a block entry in the Arrow IPC file footer.
int64_t body_length
Length of the record batch body (data buffers)
int32_t metadata_length
Length of the metadata (FlatBuffer message)
int64_t offset
Offset from the start of the file to the record batch message.