Nix 2.93.3
Lix: A modern, delicious implementation of the Nix package manager; unstable internal interfaces
Loading...
Searching...
No Matches
async-io.hh
Go to the documentation of this file.
1#pragma once
3
8#include <kj/async.h>
9#include <kj/common.h>
10#include <memory>
11#include <string_view>
12
13namespace nix {
14
15// not derived from kj's AsyncInputStream because read and tryRead are already
16// taken as method names, we don't need the other functions, and the bit about
17// minBytes does not work well with our current io model. some day, who knows?
18class AsyncInputStream : private kj::AsyncObject
19{
20public:
21 virtual ~AsyncInputStream() noexcept(false) {}
22
23 // expected to return 0 only on EOF or when `size = 0` was explicitly set.
24 virtual kj::Promise<Result<size_t>> read(void * buffer, size_t size) = 0;
25
26 kj::Promise<Result<void>> drainInto(Sink & sink);
27
28 kj::Promise<Result<std::string>> drain();
29};
30
31class AsyncSourceInputStream : public AsyncInputStream
32{
33 Source & inner;
34 // inner must reference owned if owned is set. we'll keep a unique_ptr
35 // field around in all instances to avoid duplicating the entire class
36 // into a reference variant and an owning variant (holding a box_ptr).
37 std::unique_ptr<Source> owned;
38
39public:
40 AsyncSourceInputStream(Source & inner) : inner(inner) {}
41 AsyncSourceInputStream(box_ptr<Source> inner) : inner(*inner), owned(std::move(inner).take()) {}
42
43 kj::Promise<Result<size_t>> read(void * buffer, size_t size) override;
44};
45
46class AsyncStringInputStream : public AsyncInputStream
47{
48 std::string_view s;
49
50public:
51 explicit AsyncStringInputStream(std::string_view s) : s(s) {}
52
53 kj::Promise<Result<size_t>> read(void * buffer, size_t size) override;
54};
55
56// this writes to sources instead of async streams because none of the sinks
57// we need to date are actually async, not because that wouldn't be possible
58class AsyncTeeInputStream : public AsyncInputStream
59{
60 AsyncInputStream & inner;
61 Sink & sink;
62
63public:
64 AsyncTeeInputStream(AsyncInputStream & inner, Sink & sink) : inner(inner), sink(sink) {}
65
66 kj::Promise<Result<size_t>> read(void * buffer, size_t size) override;
67};
68
69class AsyncGeneratorInputStream : public AsyncInputStream
70{
71private:
73 Bytes buf;
74
75public:
76 AsyncGeneratorInputStream(Generator<Bytes> && g) : g(std::move(g)) {}
77
78 kj::Promise<Result<size_t>> read(void * data, size_t len) override;
79};
80
81class AsyncFdInputStream : public AsyncInputStream
82{
83 int fd;
84 AutoCloseFD ownedFd; // only for closing automatically, must equal fd if set
85
86public:
87 struct shared_fd
88 {};
89
90 explicit AsyncFdInputStream(AutoCloseFD fd) : fd(fd.get()), ownedFd(std::move(fd)) {}
91 AsyncFdInputStream(shared_fd, int fd) : fd(fd) {}
92
93 kj::Promise<Result<size_t>> read(void * buffer, size_t size) override;
94};
95
100class IndirectAsyncInputStreamToSource : public Source
101{
102 struct Request
103 {
104 char * data;
105 size_t len;
106 std::promise<std::pair<size_t, kj::Own<kj::CrossThreadPromiseFulfiller<Request>>>> result;
107 };
108
109 struct Pipe
110 {
111 // used by the source implementation
112 kj::Own<kj::CrossThreadPromiseFulfiller<Request>> sendRequest;
113 // used by the async feeder function
114 kj::Promise<Request> nextRequest;
115 };
116
117 AsyncInputStream & source;
118 std::unique_ptr<AsyncInputStream> owned;
119 Pipe pipe;
120
121public:
122 explicit IndirectAsyncInputStreamToSource(AsyncInputStream & source);
123
124 explicit IndirectAsyncInputStreamToSource(box_ptr<AsyncInputStream> owned)
125 : IndirectAsyncInputStreamToSource(*owned)
126 {
127 this->owned = std::move(owned).take();
128 }
129
130 ~IndirectAsyncInputStreamToSource() noexcept(true);
131
132 KJ_DISALLOW_COPY_AND_MOVE(IndirectAsyncInputStreamToSource);
133
135 kj::Promise<void> feed();
136
137 size_t read(char * data, size_t len) override;
138};
139
140}
Definition async-io.hh:82
Definition async-io.hh:19
Definition file-descriptor.hh:51
size_t read(char *data, size_t len) override
Definition async-io.cc:125
kj::Promise< void > feed()
Definition async-io.cc:100
Definition box_ptr.hh:16
Definition async-io.hh:88
Definition generator.hh:236
Definition serialise.hh:18
Definition serialise.hh:66