Nix 2.93.3
Lix: A modern, delicious implementation of the Nix package manager; unstable internal interfaces
Loading...
Searching...
No Matches
serialise.hh
Go to the documentation of this file.
1#pragma once
3
4#include <memory>
5
6#include "lix/libutil/charptr-cast.hh"
10
11namespace nix {
12
13
17struct Sink
18{
19 virtual ~Sink() { }
20 virtual void operator () (std::string_view data) = 0;
21 virtual bool good() { return true; }
22};
23
27struct NullSink : Sink
28{
29 void operator () (std::string_view data) override
30 { }
31};
32
33
34struct FinishSink : virtual Sink
35{
36 virtual void finish() = 0;
37};
38
39
44struct BufferedSink : virtual Sink
45{
46 size_t bufSize, bufPos;
47 std::unique_ptr<char[]> buffer;
48
49 BufferedSink(size_t bufSize = 32 * 1024)
50 : bufSize(bufSize), bufPos(0), buffer(nullptr) { }
51
52 void operator () (std::string_view data) override;
53
54 void flush();
55
56protected:
57
58 virtual void writeUnbuffered(std::string_view data) = 0;
59};
60
61
65struct Source
66{
67 virtual ~Source() { }
68
74 void operator () (char * data, size_t len);
75
86 virtual size_t read(char * data, size_t len) = 0;
87
88 virtual bool good() { return true; }
89
90 void drainInto(Sink & sink);
91
92 std::string drain();
93};
94
95
100struct BufferedSource : Source
101{
102 size_t bufSize, bufPosIn, bufPosOut;
103 std::unique_ptr<char[]> buffer;
104
105 BufferedSource(size_t bufSize = 32 * 1024)
106 : bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(nullptr) { }
107
108 size_t read(char * data, size_t len) override;
109
110 bool hasData();
111
112protected:
116 virtual size_t readUnbuffered(char * data, size_t len) = 0;
117};
118
119
123struct FdSink : BufferedSink
124{
125 int fd;
126 size_t written = 0;
127
128 FdSink() : fd(-1) { }
129 FdSink(int fd) : fd(fd) { }
130 FdSink(FdSink&&) = default;
131
132 FdSink & operator=(FdSink && s)
133 {
134 flush();
135 fd = s.fd;
136 s.fd = -1;
137 written = s.written;
138 return *this;
139 }
140
141 ~FdSink();
142
143 void writeUnbuffered(std::string_view data) override;
144
145 bool good() override;
146
147private:
148 bool _good = true;
149};
150
151
155struct FdSource : BufferedSource
156{
157 int fd;
158 size_t read = 0;
160 std::optional<std::string> specialEndOfFileError;
161
162 std::string endOfFileError() const;
163
164 FdSource() : fd(-1) { }
165 FdSource(int fd) : fd(fd) { }
166 FdSource(FdSource &&) = default;
167
168 FdSource & operator=(FdSource && s)
169 {
170 fd = s.fd;
171 s.fd = -1;
172 read = s.read;
173 return *this;
174 }
175
176 bool good() override;
177protected:
178 size_t readUnbuffered(char * data, size_t len) override;
179private:
180 bool _good = true;
181};
182
183
187struct StringSink : Sink
188{
189 std::string s;
190 StringSink() { }
191 explicit StringSink(const size_t reservedSize)
192 {
193 s.reserve(reservedSize);
194 };
195 StringSink(std::string && s) : s(std::move(s)) { };
196 void operator () (std::string_view data) override;
197};
198
199
203struct StringSource : Source
204{
205 std::string_view s;
206 size_t pos;
207 StringSource(std::string_view s) : s(s), pos(0) { }
208 size_t read(char * data, size_t len) override;
209};
210
211
215struct TeeSink : Sink
216{
217 Sink & sink1, & sink2;
218 TeeSink(Sink & sink1, Sink & sink2) : sink1(sink1), sink2(sink2) { }
219 virtual void operator () (std::string_view data) override
220 {
221 sink1(data);
222 sink2(data);
223 }
224};
225
226
230struct TeeSource : Source
231{
232 Source & orig;
233 Sink & sink;
234 TeeSource(Source & orig, Sink & sink)
235 : orig(orig), sink(sink) { }
236 size_t read(char * data, size_t len) override
237 {
238 size_t n = orig.read(data, len);
239 sink({data, n});
240 return n;
241 }
242};
243
247struct SizedSource : Source
248{
249 Source & orig;
250 size_t remain;
251 SizedSource(Source & orig, size_t size)
252 : orig(orig), remain(size) { }
253 size_t read(char * data, size_t len) override
254 {
255 if (this->remain <= 0) {
256 throw EndOfFile("sized: unexpected end-of-file");
257 }
258 len = std::min(len, this->remain);
259 size_t n = this->orig.read(data, len);
260 this->remain -= n;
261 return n;
262 }
263
267 size_t drainAll()
268 {
269 std::vector<char> buf(8192);
270 size_t sum = 0;
271 while (this->remain > 0) {
272 size_t n = read(buf.data(), buf.size());
273 sum += n;
274 }
275 return sum;
276 }
277};
278
283{
284 uint64_t length = 0;
285
286 void operator () (std::string_view data) override
287 {
288 length += data.size();
289 }
290};
291
295struct LambdaSink : Sink
296{
297 typedef std::function<void(std::string_view data)> lambda_t;
298
299 lambda_t lambda;
300
301 LambdaSink(const lambda_t & lambda) : lambda(lambda) { }
302
303 void operator () (std::string_view data) override
304 {
305 lambda(data);
306 }
307};
308
309
313struct LambdaSource : Source
314{
315 typedef std::function<size_t(char *, size_t)> lambda_t;
316
317 lambda_t lambda;
318
319 LambdaSource(const lambda_t & lambda) : lambda(lambda) { }
320
321 size_t read(char * data, size_t len) override
322 {
323 return lambda(data, len);
324 }
325};
326
327struct GeneratorSource : Source
328{
329 GeneratorSource(Generator<Bytes> && g) : g(std::move(g)) {}
330
331 virtual size_t read(char * data, size_t len) override
332 {
333 // we explicitly do not poll the generator multiple times to fill the
334 // buffer, only to produce some output at all. this is allowed by the
335 // semantics of read(), only operator() must fill the buffer entirely
336 while (!buf.size()) {
337 if (auto next = g.next()) {
338 buf = *next;
339 } else {
340 throw EndOfFile("coroutine has finished");
341 }
342 }
343
344 len = std::min(len, buf.size());
345 memcpy(data, buf.data(), len);
346 buf = buf.subspan(len);
347 return len;
348 }
349
350private:
352 Bytes buf{};
353};
354
355inline Sink & operator<<(Sink & sink, Generator<Bytes> && g)
356{
357 while (auto buffer = g.next()) {
358 sink(std::string_view(buffer->data(), buffer->size()));
359 }
360 return sink;
361}
362
364using WireFormatGenerator = Generator<Bytes, SerializingTransform>;
365
367{
368 std::array<unsigned char, 8> buf;
369
370 Bytes operator()(uint64_t n)
371 {
372 buf[0] = n & 0xff;
373 buf[1] = (n >> 8) & 0xff;
374 buf[2] = (n >> 16) & 0xff;
375 buf[3] = (n >> 24) & 0xff;
376 buf[4] = (n >> 32) & 0xff;
377 buf[5] = (n >> 40) & 0xff;
378 buf[6] = (n >> 48) & 0xff;
379 buf[7] = (unsigned char) (n >> 56) & 0xff;
380 return {charptr_cast<const char *>(buf.begin()), 8};
381 }
382
383 static Bytes padding(size_t unpadded)
384 {
385 return Bytes("\0\0\0\0\0\0\0", unpadded % 8 ? 8 - unpadded % 8 : 0);
386 }
387
388 // opt in to generator chaining. without this co_yielding
389 // another generator of any type will cause a type error.
390 auto operator()(Generator<Bytes> && g)
391 {
392 return std::move(g);
393 }
394
395 // only choose this for *exactly* char spans, do not allow implicit
396 // conversions. this would cause ambiguities with strings literals,
397 // and resolving those with more string-like overloads needs a lot.
398 template<typename Span>
399 requires std::same_as<Span, std::span<char>> || std::same_as<Span, std::span<const char>>
400 Bytes operator()(Span s)
401 {
402 return s;
403 }
404 WireFormatGenerator operator()(std::string_view s);
405 WireFormatGenerator operator()(const Strings & s);
406 WireFormatGenerator operator()(const StringSet & s);
407 WireFormatGenerator operator()(const Error & s);
408};
409
410void writePadding(size_t len, Sink & sink);
411
412// NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines):
413// These coroutines do their entire job before the semicolon and are not
414// retained, so they live long enough.
415inline Sink & operator<<(Sink & sink, uint64_t u)
416{
417 return sink << [&]() -> WireFormatGenerator { co_yield u; }();
418}
419
420inline Sink & operator<<(Sink & sink, std::string_view s)
421{
422 return sink << [&]() -> WireFormatGenerator { co_yield s; }();
423}
424
425inline Sink & operator<<(Sink & sink, const Strings & s)
426{
427 return sink << [&]() -> WireFormatGenerator { co_yield s; }();
428}
429
430inline Sink & operator<<(Sink & sink, const StringSet & s)
431{
432 return sink << [&]() -> WireFormatGenerator { co_yield s; }();
433}
434
435inline Sink & operator<<(Sink & sink, const Error & ex)
436{
437 return sink << [&]() -> WireFormatGenerator { co_yield ex; }();
438}
439// NOLINTEND(cppcoreguidelines-avoid-capturing-lambda-coroutines)
440
441MakeError(SerialisationError, Error);
442
443template<typename T>
444T readNum(Source & source);
445
446inline unsigned int readInt(Source & source)
447{
448 return readNum<unsigned int>(source);
449}
450
451
452inline uint64_t readLongLong(Source & source)
453{
454 return readNum<uint64_t>(source);
455}
456
457
458void readPadding(size_t len, Source & source);
459size_t readString(char * buf, size_t max, Source & source);
460std::string readString(Source & source, size_t max = std::numeric_limits<size_t>::max());
461template<class T> T readStrings(Source & source);
462
463Source & operator >> (Source & in, std::string & s);
464
465template<typename T>
466Source & operator >> (Source & in, T & n)
467{
468 n = readNum<T>(in);
469 return in;
470}
471
472template<typename T>
473Source & operator >> (Source & in, bool & b)
474{
475 b = readNum<uint64_t>(in);
476 return in;
477}
478
479Error readError(Source & source);
480
481
485struct StreamToSourceAdapter : Source
486{
487 std::shared_ptr<std::basic_istream<char>> istream;
488
489 StreamToSourceAdapter(std::shared_ptr<std::basic_istream<char>> istream)
490 : istream(istream)
491 { }
492
493 size_t read(char * data, size_t len) override
494 {
495 if (!istream->read(data, len)) {
496 if (istream->eof()) {
497 if (istream->gcount() == 0)
498 throw EndOfFile("end of file");
499 } else
500 throw Error("I/O error in StreamToSourceAdapter");
501 }
502 return istream->gcount();
503 }
504};
505
506
515struct FramedSource : Source
516{
517 Source & from;
518 bool eof = false;
519 std::vector<char> pending;
520 size_t pos = 0;
521
522 FramedSource(Source & from) : from(from)
523 { }
524
525 ~FramedSource()
526 {
527 try {
528 if (!eof) {
529 while (true) {
530 auto n = readInt(from);
531 if (!n) break;
532 std::vector<char> data(n);
533 from(data.data(), n);
534 }
535 }
536 } catch (...) {
537 ignoreExceptionInDestructor();
538 }
539 }
540
541 size_t read(char * data, size_t len) override
542 {
543 if (eof) throw EndOfFile("reached end of FramedSource");
544
545 if (pos >= pending.size()) {
546 size_t len = readInt(from);
547 if (!len) {
548 eof = true;
549 return 0;
550 }
551 pending = std::vector<char>(len);
552 pos = 0;
553 from(pending.data(), len);
554 }
555
556 auto n = std::min(len, pending.size() - pos);
557 memcpy(data, pending.data() + pos, n);
558 pos += n;
559 return n;
560 }
561};
562
569struct FramedSink : nix::BufferedSink
570{
571 BufferedSink & to;
572 std::exception_ptr & ex;
573
574 FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex)
575 { }
576
577 ~FramedSink()
578 {
579 try {
580 to << 0;
581 to.flush();
582 } catch (...) {
583 ignoreExceptionInDestructor();
584 }
585 }
586
587 void writeUnbuffered(std::string_view data) override
588 {
589 /* Don't send more data if the remote has
590 encountered an error. */
591 if (ex) {
592 auto ex2 = ex;
593 ex = nullptr;
594 std::rethrow_exception(ex2);
595 }
596 to << data.size();
597 to(data);
598 };
599};
600
601/* Disabling GC when entering a coroutine (without the boehm patch).
602 mutable to avoid boehm gc dependency in libutil.
603 */
604extern std::shared_ptr<void> (*create_coro_gc_hook)();
605
606
607}
Definition serialise.hh:45
virtual size_t readUnbuffered(char *data, size_t len)=0
size_t read(char *data, size_t len) override
Definition serialise.cc:140
Definition serialise.hh:156
size_t readUnbuffered(char *data, size_t len) override
Definition serialise.cc:161
std::optional< std::string > specialEndOfFileError
Definition serialise.hh:160
Definition serialise.hh:35
size_t read(char *data, size_t len) override
Definition serialise.hh:541
virtual size_t read(char *data, size_t len) override
Definition serialise.hh:331
Definition generator.hh:236
size_t read(char *data, size_t len) override
Definition serialise.hh:321
Definition serialise.hh:283
Definition serialise.hh:28
Definition serialise.hh:367
Definition serialise.hh:18
size_t drainAll()
Definition serialise.hh:267
size_t read(char *data, size_t len) override
Definition serialise.hh:253
Definition serialise.hh:66
void operator()(char *data, size_t len)
Definition serialise.cc:108
virtual size_t read(char *data, size_t len)=0
size_t read(char *data, size_t len) override
Definition serialise.hh:493
size_t read(char *data, size_t len) override
Definition serialise.cc:186
size_t read(char *data, size_t len) override
Definition serialise.hh:236