sparrow-ipc 0.3.0
Loading...
Searching...
No Matches
serializer.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <sparrow/record_batch.hpp>
4
12
13namespace sparrow_ipc
14{
37 {
38 public:
39
48 template <writable_stream TStream>
49 serializer(TStream& stream, std::optional<CompressionType> compression = std::nullopt)
50 : m_stream(stream)
51 , m_compression(compression)
52 {
53 }
54
63
69 void write(const sparrow::record_batch& rb);
70
86 template <std::ranges::input_range R>
87 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
88 void write(const R& record_batches)
89 {
90 CompressionCache compressed_buffers_cache;
91 if (std::ranges::empty(record_batches))
92 {
93 return;
94 }
95
96 if (m_ended)
97 {
98 throw std::runtime_error("Cannot append to a serializer that has been ended");
99 }
100
101 // NOTE `reserve_function` is making us store a cache for the compressed buffers at this level.
102 // The benefit of capacity allocation should be evaluated vs storing a cache of compressed buffers
103 // of record batches.
104 const auto reserve_function = [&record_batches, &compressed_buffers_cache, this]()
105 {
107 record_batches,
108 m_stream.size(),
109 m_schema_received,
110 m_compression,
111 m_dict_tracker,
112 compressed_buffers_cache
113 );
114 };
115
116 m_stream.reserve(reserve_function);
117
118 if (!m_schema_received)
119 {
120 m_schema_received = true;
121 m_dtypes = get_column_dtypes(*record_batches.begin());
122 serialize_schema_message(*record_batches.begin(), m_stream);
123 }
124
125 for (const auto& rb : record_batches)
126 {
127 if (get_column_dtypes(rb) != m_dtypes)
128 {
129 throw std::invalid_argument("Record batch schema does not match serializer schema");
130 }
131
132 for_each_pending_dictionary(rb, m_dict_tracker, [&](const dictionary_info& dict_info)
133 {
135 dict_info.id,
136 dict_info.data,
137 dict_info.is_delta,
138 m_stream,
139 m_compression,
140 compressed_buffers_cache
141 );
142 });
143
144 serialize_record_batch(rb, m_stream, m_compression, compressed_buffers_cache);
145 }
146 }
147
164 serializer& operator<<(const sparrow::record_batch& rb)
165 {
166 write(rb);
167 return *this;
168 }
169
188 template <std::ranges::input_range R>
189 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
190 serializer& operator<<(const R& record_batches)
191 {
192 write(record_batches);
193 return *this;
194 }
195
211 {
212 return manip(*this);
213 }
214
225 void end();
226
227 private:
228
229 static std::vector<sparrow::data_type> get_column_dtypes(const sparrow::record_batch& rb);
230
231 bool m_schema_received{false};
232 std::vector<sparrow::data_type> m_dtypes;
233 any_output_stream m_stream;
234 bool m_ended{false};
235 std::optional<CompressionType> m_compression;
236 dictionary_tracker m_dict_tracker;
237 };
238
240 {
241 serializer.end();
242 return serializer;
243 }
244}
Type-erased wrapper for any stream-like object.
A class for serializing Apache Arrow record batches to an output stream.
void write(const sparrow::record_batch &rb)
Writes a record batch to the serializer.
serializer(TStream &stream, std::optional< CompressionType > compression=std::nullopt)
Constructs a serializer object with a reference to a stream.
void write(const R &record_batches)
Writes a collection of record batches to the stream.
void end()
Finalizes the serialization process by writing end-of-stream marker.
serializer & operator<<(const sparrow::record_batch &rb)
serializer & operator<<(const R &record_batches)
~serializer()
Destructor for the serializer.
serializer & operator<<(serializer &(*manip)(serializer &))
#define SPARROW_IPC_API
Definition config.hpp:12
std::size_t calculate_serializer_reserve_size(const R &record_batches, std::size_t current_stream_size, bool schema_received, std::optional< CompressionType > compression, const dictionary_tracker &dict_tracker, std::optional< std::reference_wrapper< CompressionCache > > cache=std::nullopt)
void for_each_pending_dictionary(const sparrow::record_batch &record_batch, dictionary_tracker &tracker, Func visitor)
SPARROW_IPC_API serialized_record_batch_info serialize_record_batch(const sparrow::record_batch &record_batch, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a record batch into a binary format following the Arrow IPC specification.
SPARROW_IPC_API void serialize_schema_message(const sparrow::record_batch &record_batch, any_output_stream &stream)
Serializes a schema message for a record batch into a byte buffer.
SPARROW_IPC_API serialized_record_batch_info serialize_dictionary_batch(int64_t dictionary_id, const sparrow::record_batch &record_batch, bool is_delta, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a dictionary batch into a binary format following the Arrow IPC specification.
serializer & end_stream(serializer &serializer)
Information about a dictionary used for encoding.
bool is_delta
Whether this is a delta update.
sparrow::record_batch data
Dictionary values as a single-column record batch.
int64_t id
Dictionary identifier.