sparrow-ipc 1.0.1
Loading...
Searching...
No Matches
serializer.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <sparrow/record_batch.hpp>
4
12
13namespace sparrow_ipc
14{
37 {
38 public:
39
48 template <writable_stream TStream>
49 serializer(TStream& stream, std::optional<CompressionType> compression = std::nullopt)
50 : m_stream(stream)
51 , m_compression(compression)
52 {
53 }
54
67 template <writable_stream TStream>
69 TStream& stream,
70 const sparrow::record_batch& schema_batch,
71 std::optional<CompressionType> compression = std::nullopt
72 )
73 : m_stream(stream)
74 , m_compression(compression)
75 , m_schema_received(true)
76 , m_dtypes(get_column_dtypes(schema_batch))
77 {
78 serialize_schema_message(schema_batch, m_stream);
79 }
80
89
95 void write(const sparrow::record_batch& rb);
96
112 template <std::ranges::input_range R>
113 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
114 void write(const R& record_batches)
115 {
116 CompressionCache compressed_buffers_cache;
117 if (std::ranges::empty(record_batches))
118 {
119 return;
120 }
121
122 if (m_ended)
123 {
124 throw std::runtime_error("Cannot append to a serializer that has been ended");
125 }
126
127 // NOTE `reserve_function` is making us store a cache for the compressed buffers at this level.
128 // The benefit of capacity allocation should be evaluated vs storing a cache of compressed buffers
129 // of record batches.
130 const auto reserve_function = [&record_batches, &compressed_buffers_cache, this]()
131 {
133 record_batches,
134 m_stream.size(),
135 m_schema_received,
136 m_compression,
137 m_dict_tracker,
138 compressed_buffers_cache
139 );
140 };
141
142 m_stream.reserve(reserve_function);
143
144 if (!m_schema_received)
145 {
146 m_schema_received = true;
147 m_dtypes = get_column_dtypes(*record_batches.begin());
148 serialize_schema_message(*record_batches.begin(), m_stream);
149 }
150
151 for (const auto& rb : record_batches)
152 {
153 if (get_column_dtypes(rb) != m_dtypes)
154 {
155 throw std::invalid_argument("Record batch schema does not match serializer schema");
156 }
157
158 for_each_pending_dictionary(rb, m_dict_tracker, [&](const dictionary_info& dict_info)
159 {
161 dict_info.id,
162 dict_info.data,
163 dict_info.is_delta,
164 m_stream,
165 m_compression,
166 compressed_buffers_cache
167 );
168 });
169
170 serialize_record_batch(rb, m_stream, m_compression, compressed_buffers_cache);
171 }
172 }
173
190 serializer& operator<<(const sparrow::record_batch& rb)
191 {
192 write(rb);
193 return *this;
194 }
195
214 template <std::ranges::input_range R>
215 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
216 serializer& operator<<(const R& record_batches)
217 {
218 write(record_batches);
219 return *this;
220 }
221
237 {
238 return manip(*this);
239 }
240
251 void end();
252
253 private:
254
255 static std::vector<sparrow::data_type> get_column_dtypes(const sparrow::record_batch& rb);
256
257 bool m_schema_received{false};
258 std::vector<sparrow::data_type> m_dtypes;
259 any_output_stream m_stream;
260 bool m_ended{false};
261 std::optional<CompressionType> m_compression;
262 dictionary_tracker m_dict_tracker;
263 };
264
266 {
267 serializer.end();
268 return serializer;
269 }
270}
Type-erased wrapper for any stream-like object.
A class for serializing Apache Arrow record batches to an output stream.
void write(const sparrow::record_batch &rb)
Writes a record batch to the serializer.
serializer(TStream &stream, std::optional< CompressionType > compression=std::nullopt)
Constructs a serializer object with a reference to a stream.
void write(const R &record_batches)
Writes a collection of record batches to the stream.
void end()
Finalizes the serialization process by writing end-of-stream marker.
serializer(TStream &stream, const sparrow::record_batch &schema_batch, std::optional< CompressionType > compression=std::nullopt)
Constructs a serializer object with a reference to a stream and a schema.
serializer & operator<<(const sparrow::record_batch &rb)
serializer & operator<<(const R &record_batches)
~serializer()
Destructor for the serializer.
serializer & operator<<(serializer &(*manip)(serializer &))
#define SPARROW_IPC_API
Definition config.hpp:12
std::size_t calculate_serializer_reserve_size(const R &record_batches, std::size_t current_stream_size, bool schema_received, std::optional< CompressionType > compression, const dictionary_tracker &dict_tracker, std::optional< std::reference_wrapper< CompressionCache > > cache=std::nullopt)
void for_each_pending_dictionary(const sparrow::record_batch &record_batch, dictionary_tracker &tracker, Func visitor)
SPARROW_IPC_API serialized_record_batch_info serialize_record_batch(const sparrow::record_batch &record_batch, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a record batch into a binary format following the Arrow IPC specification.
SPARROW_IPC_API void serialize_schema_message(const sparrow::record_batch &record_batch, any_output_stream &stream)
Serializes a schema message for a record batch into a byte buffer.
SPARROW_IPC_API serialized_record_batch_info serialize_dictionary_batch(int64_t dictionary_id, const sparrow::record_batch &record_batch, bool is_delta, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a dictionary batch into a binary format following the Arrow IPC specification.
serializer & end_stream(serializer &serializer)
Information about a dictionary used for encoding.
bool is_delta
Whether this is a delta update.
sparrow::record_batch data
Dictionary values as a single-column record batch.
int64_t id
Dictionary identifier.