1 | #include <port/unistd.h> |
2 | #include <errno.h> |
3 | |
4 | #include <Common/Exception.h> |
5 | #include <Common/ProfileEvents.h> |
6 | #include <Common/CurrentMetrics.h> |
7 | #include <Common/Stopwatch.h> |
8 | |
9 | #include <IO/WriteBufferFromFileDescriptor.h> |
10 | #include <IO/WriteHelpers.h> |
11 | |
12 | |
13 | namespace ProfileEvents |
14 | { |
15 | extern const Event WriteBufferFromFileDescriptorWrite; |
16 | extern const Event WriteBufferFromFileDescriptorWriteFailed; |
17 | extern const Event WriteBufferFromFileDescriptorWriteBytes; |
18 | extern const Event DiskWriteElapsedMicroseconds; |
19 | } |
20 | |
21 | namespace CurrentMetrics |
22 | { |
23 | extern const Metric Write; |
24 | } |
25 | |
26 | namespace DB |
27 | { |
28 | |
29 | namespace ErrorCodes |
30 | { |
31 | extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR; |
32 | extern const int CANNOT_FSYNC; |
33 | extern const int CANNOT_SEEK_THROUGH_FILE; |
34 | extern const int CANNOT_TRUNCATE_FILE; |
35 | } |
36 | |
37 | |
38 | void WriteBufferFromFileDescriptor::nextImpl() |
39 | { |
40 | if (!offset()) |
41 | return; |
42 | |
43 | Stopwatch watch; |
44 | |
45 | size_t bytes_written = 0; |
46 | while (bytes_written != offset()) |
47 | { |
48 | ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite); |
49 | |
50 | ssize_t res = 0; |
51 | { |
52 | CurrentMetrics::Increment metric_increment{CurrentMetrics::Write}; |
53 | res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written); |
54 | } |
55 | |
56 | if ((-1 == res || 0 == res) && errno != EINTR) |
57 | { |
58 | ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteFailed); |
59 | throwFromErrnoWithPath("Cannot write to file " + getFileName(), getFileName(), |
60 | ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR); |
61 | } |
62 | |
63 | if (res > 0) |
64 | bytes_written += res; |
65 | } |
66 | |
67 | ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds()); |
68 | ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written); |
69 | } |
70 | |
71 | |
72 | /// Name or some description of file. |
73 | std::string WriteBufferFromFileDescriptor::getFileName() const |
74 | { |
75 | return "(fd = " + toString(fd) + ")" ; |
76 | } |
77 | |
78 | |
79 | WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor( |
80 | int fd_, |
81 | size_t buf_size, |
82 | char * existing_memory, |
83 | size_t alignment) |
84 | : WriteBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_) {} |
85 | |
86 | |
87 | WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor() |
88 | { |
89 | try |
90 | { |
91 | if (fd >= 0) |
92 | next(); |
93 | } |
94 | catch (...) |
95 | { |
96 | tryLogCurrentException(__PRETTY_FUNCTION__); |
97 | } |
98 | } |
99 | |
100 | |
101 | off_t WriteBufferFromFileDescriptor::getPositionInFile() |
102 | { |
103 | return seek(0, SEEK_CUR); |
104 | } |
105 | |
106 | |
107 | void WriteBufferFromFileDescriptor::sync() |
108 | { |
109 | /// If buffer has pending data - write it. |
110 | next(); |
111 | |
112 | /// Request OS to sync data with storage medium. |
113 | int res = fsync(fd); |
114 | if (-1 == res) |
115 | throwFromErrnoWithPath("Cannot fsync " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSYNC); |
116 | } |
117 | |
118 | |
119 | off_t WriteBufferFromFileDescriptor::doSeek(off_t offset, int whence) |
120 | { |
121 | off_t res = lseek(fd, offset, whence); |
122 | if (-1 == res) |
123 | throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(), |
124 | ErrorCodes::CANNOT_SEEK_THROUGH_FILE); |
125 | return res; |
126 | } |
127 | |
128 | |
129 | void WriteBufferFromFileDescriptor::doTruncate(off_t length) |
130 | { |
131 | int res = ftruncate(fd, length); |
132 | if (-1 == res) |
133 | throwFromErrnoWithPath("Cannot truncate file " + getFileName(), getFileName(), ErrorCodes::CANNOT_TRUNCATE_FILE); |
134 | } |
135 | |
136 | } |
137 | |