sparrow-ipc 1.0.1
Loading...
Searching...
No Matches
stream_file_serializer.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <cstddef>
4#include <optional>
5#include <vector>
6
7#include <sparrow/record_batch.hpp>
8
18
19namespace sparrow_ipc
20{
27 {
28 int64_t offset;
30 int64_t body_length;
31 };
32
42 const sparrow::record_batch& record_batch,
43 const std::vector<record_batch_block>& dictionary_blocks,
44 const std::vector<record_batch_block>& record_batch_blocks,
45 any_output_stream& stream
46 );
47
71 {
72 public:
73
82 template <writable_stream TStream>
83 stream_file_serializer(TStream& stream, std::optional<CompressionType> compression = std::nullopt)
84 : m_stream(stream)
85 , m_compression(compression)
86 {
87 }
88
101 template <writable_stream TStream>
103 TStream& stream,
104 const sparrow::record_batch& schema_batch,
105 std::optional<CompressionType> compression = std::nullopt
106 )
107 : m_stream(stream)
108 , m_compression(compression)
109 {
110 // Write file header magic
112 m_stream.add_padding();
113 m_header_written = true;
114
115 // Establish schema
116 m_schema_received = true;
117 m_first_record_batch = schema_batch;
118 m_dtypes = get_column_dtypes(schema_batch);
119 serialize_schema_message(schema_batch, m_stream);
120 }
121
130
138 void write(const sparrow::record_batch& rb);
139
159 template <std::ranges::input_range R>
160 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
161 void write(const R& record_batches)
162 {
163 CompressionCache compressed_buffers_cache;
164 if (std::ranges::empty(record_batches))
165 {
166 return;
167 }
168
169 if (m_ended)
170 {
171 throw std::runtime_error("Cannot write to a file serializer that has been ended");
172 }
173
174 // Write file header magic on first write
175 if (!m_header_written)
176 {
178 m_stream.add_padding();
179 m_header_written = true;
180 }
181
182 // NOTE `reserve_function` is making us store a cache for the compressed buffers at this level.
183 // The benefit of capacity allocation should be evaluated vs storing a cache of compressed buffers
184 // of record batches.
185 const auto reserve_function = [&record_batches, &compressed_buffers_cache, this]()
186 {
188 record_batches,
189 m_stream.size(),
193 compressed_buffers_cache
194 );
195 };
196
197 m_stream.reserve(reserve_function);
198
200 {
201 m_schema_received = true;
202 m_first_record_batch = *record_batches.begin();
203 m_dtypes = get_column_dtypes(*record_batches.begin());
204 serialize_schema_message(*record_batches.begin(), m_stream);
205 }
206
207 for (const auto& rb : record_batches)
208 {
209 if (get_column_dtypes(rb) != m_dtypes)
210 {
211 throw std::invalid_argument("Record batch schema does not match file serializer schema");
212 }
213
215 {
216 if (m_dict_tracker.is_emitted(dict_info.id) && !dict_info.is_delta)
217 {
218 throw std::runtime_error(
219 "Arrow file format does not support multiple non-delta dictionary batches "
220 "for the same dictionary id"
221 );
222 }
223
224 const int64_t dict_offset = static_cast<int64_t>(m_stream.size());
225 const auto dict_block_info = serialize_dictionary_batch(
226 dict_info.id,
227 dict_info.data,
228 dict_info.is_delta,
229 m_stream,
231 compressed_buffers_cache
232 );
233 m_dictionary_blocks.emplace_back(
234 dict_offset,
235 dict_block_info.metadata_length,
236 dict_block_info.body_length
237 );
238 });
239
240 // Offset is from the start of the file to the record batch message
241 const int64_t offset = static_cast<int64_t>(m_stream.size());
242
243 // Serialize and get block info
244 const auto info = serialize_record_batch(rb, m_stream, m_compression, compressed_buffers_cache);
245
246 m_record_batch_blocks.emplace_back(offset, info.metadata_length, info.body_length);
247 }
248 }
249
266 stream_file_serializer& operator<<(const sparrow::record_batch& rb)
267 {
268 write(rb);
269 return *this;
270 }
271
290 template <std::ranges::input_range R>
291 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
292 stream_file_serializer& operator<<(const R& record_batches)
293 {
294 write(record_batches);
295 return *this;
296 }
297
313 {
314 return manip(*this);
315 }
316
333 void end();
334
335 bool m_header_written{false};
336 bool m_schema_received{false};
337 std::optional<sparrow::record_batch> m_first_record_batch;
338 std::vector<sparrow::data_type> m_dtypes;
340 bool m_ended{false};
341 std::optional<CompressionType> m_compression;
343 std::vector<record_batch_block> m_dictionary_blocks;
344 std::vector<record_batch_block> m_record_batch_blocks;
345 };
346
365}
Type-erased wrapper for any stream-like object.
Tracks dictionaries during serialization.
A class for serializing Apache Arrow record batches to an output stream.
void end()
Finalizes the serialization process by writing end-of-stream marker.
A class for serializing Apache Arrow record batches to the IPC file format.
std::vector< record_batch_block > m_record_batch_blocks
stream_file_serializer & operator<<(stream_file_serializer &(*manip)(stream_file_serializer &))
std::vector< sparrow::data_type > m_dtypes
std::optional< CompressionType > m_compression
void write(const R &record_batches)
Writes a collection of record batches to the file.
std::optional< sparrow::record_batch > m_first_record_batch
~stream_file_serializer()
Destructor for the stream_file_serializer.
stream_file_serializer & operator<<(const sparrow::record_batch &rb)
void write(const sparrow::record_batch &rb)
Writes a single record batch to the file.
stream_file_serializer(TStream &stream, std::optional< CompressionType > compression=std::nullopt)
Constructs a stream_file_serializer object with a reference to a stream.
void end()
Finalizes the file serialization by writing footer and trailing magic bytes.
stream_file_serializer & operator<<(const R &record_batches)
std::vector< record_batch_block > m_dictionary_blocks
stream_file_serializer(TStream &stream, const sparrow::record_batch &schema_batch, std::optional< CompressionType > compression=std::nullopt)
Constructs a stream_file_serializer object with a reference to a stream and a schema.
#define SPARROW_IPC_API
Definition config.hpp:12
std::size_t calculate_serializer_reserve_size(const R &record_batches, std::size_t current_stream_size, bool schema_received, std::optional< CompressionType > compression, const dictionary_tracker &dict_tracker, std::optional< std::reference_wrapper< CompressionCache > > cache=std::nullopt)
void for_each_pending_dictionary(const sparrow::record_batch &record_batch, dictionary_tracker &tracker, Func visitor)
SPARROW_IPC_API serialized_record_batch_info serialize_record_batch(const sparrow::record_batch &record_batch, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a record batch into a binary format following the Arrow IPC specification.
SPARROW_IPC_API size_t write_footer(const sparrow::record_batch &record_batch, const std::vector< record_batch_block > &dictionary_blocks, const std::vector< record_batch_block > &record_batch_blocks, any_output_stream &stream)
Writes the Arrow IPC file footer.
SPARROW_IPC_API void serialize_schema_message(const sparrow::record_batch &record_batch, any_output_stream &stream)
Serializes a schema message for a record batch into a byte buffer.
SPARROW_IPC_API serialized_record_batch_info serialize_dictionary_batch(int64_t dictionary_id, const sparrow::record_batch &record_batch, bool is_delta, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a dictionary batch into a binary format following the Arrow IPC specification.
constexpr std::array< std::uint8_t, 8 > arrow_file_header_magic
Magic bytes with padding for file header (8 bytes total for alignment)
stream_file_serializer & end_file(stream_file_serializer &serializer)
SPARROW_IPC_API std::vector< sparrow::data_type > get_column_dtypes(const sparrow::record_batch &rb)
Information about a dictionary used for encoding.
bool is_delta
Whether this is a delta update.
sparrow::record_batch data
Dictionary values as a single-column record batch.
int64_t id
Dictionary identifier.
Represents a block entry in the Arrow IPC file footer.
int64_t body_length
Length of the record batch body (data buffers)
int32_t metadata_length
Length of the metadata (FlatBuffer message)
int64_t offset
Offset from the start of the file to the record batch message.