xrootd
XrdTpcStream.hh
Go to the documentation of this file.
1
10#include <memory>
11#include <vector>
12#include <string>
13
14#include <cstring>
15
16struct stat;
17
18class XrdSfsFile;
19class XrdSysError;
20
21namespace TPC {
22class Stream {
23public:
24 Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
25 : m_open_for_write(false),
26 m_avail_count(max_blocks),
27 m_fh(std::move(fh)),
28 m_offset(0),
29 m_log(log)
30 {
31 m_buffers.reserve(max_blocks);
32 for (size_t idx=0; idx < max_blocks; idx++) {
33 m_buffers.push_back(new Entry(buffer_size));
34 }
35 m_open_for_write = true;
36 }
37
39
40 int Stat(struct stat *);
41
42 int Read(off_t offset, char *buffer, size_t size);
43
44 // Writes a buffer of a given size to an offset.
45 // This will often keep the buffer in memory in to present the underlying
46 // filesystem with a single stream of data (required for HDFS); further,
47 // it will also buffer to align the writes on a 1MB boundary (required
48 // for some RADOS configurations). When force is set to true, it will
49 // skip the buffering and always write (this should only be done at the
50 // end of a stream!).
51 //
52 // Returns the number of bytes written; on error, returns -1 and sets
53 // the error code and error message for the stream
54 ssize_t Write(off_t offset, const char *buffer, size_t size, bool force);
55
56 size_t AvailableBuffers() const {return m_avail_count;}
57
58 void DumpBuffers() const;
59
60 // Flush and finalize the stream. If all data has been sent to the underlying
61 // file handle, close() will be invoked on the file handle.
62 //
63 // Further write operations on this stream will result in an error.
64 // If any memory buffers remain, an error occurs.
65 //
66 // Returns true on success; false otherwise.
67 bool Finalize();
68
69 std::string GetErrorMessage() const {return m_error_buf;}
70
71private:
72
73 class Entry {
74 public:
75 Entry(size_t capacity) :
76 m_offset(-1),
77 m_capacity(capacity),
78 m_size(0)
79 {}
80
81 bool Available() const {return m_offset == -1;}
82
83 int Write(Stream &stream, bool force) {
84 if (Available() || !CanWrite(stream)) {return 0;}
85 // Only full buffer writes are accepted unless the stream forces a flush
86 // (i.e., we are at EOF) because the multistream code uses buffer occupancy
87 // to determine how many streams are currently in-flight. If we do an early
88 // write, then the buffer will be empty and the multistream code may decide
89 // to start another request (which we don't have the capacity to serve!).
90 if (!force && (m_size != m_capacity)) {
91 return 0;
92 }
93 ssize_t retval = stream.WriteImpl(m_offset, &m_buffer[0], m_size);
94 // Currently the only valid negative value is SFS_ERROR (-1); checking for
95 // all negative values to future-proof the code.
96 if ((retval < 0) || (static_cast<size_t>(retval) != m_size)) {
97 return -1;
98 }
99 m_offset = -1;
100 m_size = 0;
101 return retval;
102 }
103
104 size_t Accept(off_t offset, const char *buf, size_t size) {
105 // Validate acceptance criteria.
106 if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
107 return 0;
108 }
109 size_t to_accept = m_capacity - m_size;
110 if (to_accept == 0) {return 0;}
111 if (size > to_accept) {
112 size = to_accept;
113 }
114
115 // Inflate the underlying buffer if needed.
116 ssize_t new_bytes_needed = (m_size + size) - m_buffer.capacity();
117 if (new_bytes_needed > 0) {
118 m_buffer.reserve(m_capacity);
119 }
120
121 // Finally, do the copy.
122 memcpy(&m_buffer[0] + m_size, buf, size);
123 m_size += size;
124 if (m_offset == -1) {
125 m_offset = offset;
126 }
127 return size;
128 }
129
131 if (!Available()) {return;}
132#if __cplusplus > 199711L
133 m_buffer.shrink_to_fit();
134#endif
135 }
136
137 void Move(Entry &other) {
138 m_buffer.swap(other.m_buffer);
139 m_offset = other.m_offset;
140 m_size = other.m_size;
141 }
142
143 off_t GetOffset() const {return m_offset;}
144 size_t GetCapacity() const {return m_capacity;}
145 size_t GetSize() const {return m_size;}
146
147 private:
148
149 Entry(const Entry&) = delete;
150
151 bool CanWrite(Stream &stream) const {
152 return (m_size > 0) && (m_offset == stream.m_offset);
153 }
154
155 off_t m_offset; // Offset within file that m_buffer[0] represents.
157 size_t m_size; // Number of bytes held in buffer.
158 std::vector<char> m_buffer;
159 };
160
161 ssize_t WriteImpl(off_t offset, const char *buffer, size_t size);
162
165 std::unique_ptr<XrdSfsFile> m_fh;
166 off_t m_offset;
167 std::vector<Entry*> m_buffers;
169 std::string m_error_buf;
170};
171}
#define stat(a, b)
Definition: XrdPosix.hh:96
Definition: XrdTpcStream.hh:73
void Move(Entry &other)
Definition: XrdTpcStream.hh:137
size_t m_size
Definition: XrdTpcStream.hh:157
int Write(Stream &stream, bool force)
Definition: XrdTpcStream.hh:83
off_t m_offset
Definition: XrdTpcStream.hh:155
Entry(size_t capacity)
Definition: XrdTpcStream.hh:75
off_t GetOffset() const
Definition: XrdTpcStream.hh:143
std::vector< char > m_buffer
Definition: XrdTpcStream.hh:158
bool CanWrite(Stream &stream) const
Definition: XrdTpcStream.hh:151
Entry(const Entry &)=delete
bool Available() const
Definition: XrdTpcStream.hh:81
size_t GetSize() const
Definition: XrdTpcStream.hh:145
size_t GetCapacity() const
Definition: XrdTpcStream.hh:144
void ShrinkIfUnused()
Definition: XrdTpcStream.hh:130
size_t Accept(off_t offset, const char *buf, size_t size)
Definition: XrdTpcStream.hh:104
size_t m_capacity
Definition: XrdTpcStream.hh:156
Definition: XrdTpcStream.hh:22
std::unique_ptr< XrdSfsFile > m_fh
Definition: XrdTpcStream.hh:165
ssize_t WriteImpl(off_t offset, const char *buffer, size_t size)
bool m_open_for_write
Definition: XrdTpcStream.hh:163
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
off_t m_offset
Definition: XrdTpcStream.hh:166
void DumpBuffers() const
size_t m_avail_count
Definition: XrdTpcStream.hh:164
XrdSysError & m_log
Definition: XrdTpcStream.hh:168
int Stat(struct stat *)
std::vector< Entry * > m_buffers
Definition: XrdTpcStream.hh:167
int Read(off_t offset, char *buffer, size_t size)
bool Finalize()
Stream(std::unique_ptr< XrdSfsFile > fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
Definition: XrdTpcStream.hh:24
std::string GetErrorMessage() const
Definition: XrdTpcStream.hh:69
size_t AvailableBuffers() const
Definition: XrdTpcStream.hh:56
std::string m_error_buf
Definition: XrdTpcStream.hh:169
Definition: XrdSfsInterface.hh:652
Definition: XrdSysError.hh:90
Definition: XrdTpcState.hh:16