1#include <port/unistd.h>
2#include <errno.h>
3
4#include <Common/Exception.h>
5#include <Common/ProfileEvents.h>
6#include <Common/CurrentMetrics.h>
7#include <Common/Stopwatch.h>
8
9#include <IO/WriteBufferFromFileDescriptor.h>
10#include <IO/WriteHelpers.h>
11
12
13namespace ProfileEvents
14{
15 extern const Event WriteBufferFromFileDescriptorWrite;
16 extern const Event WriteBufferFromFileDescriptorWriteFailed;
17 extern const Event WriteBufferFromFileDescriptorWriteBytes;
18 extern const Event DiskWriteElapsedMicroseconds;
19}
20
21namespace CurrentMetrics
22{
23 extern const Metric Write;
24}
25
26namespace DB
27{
28
29namespace ErrorCodes
30{
31 extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR;
32 extern const int CANNOT_FSYNC;
33 extern const int CANNOT_SEEK_THROUGH_FILE;
34 extern const int CANNOT_TRUNCATE_FILE;
35}
36
37
38void WriteBufferFromFileDescriptor::nextImpl()
39{
40 if (!offset())
41 return;
42
43 Stopwatch watch;
44
45 size_t bytes_written = 0;
46 while (bytes_written != offset())
47 {
48 ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWrite);
49
50 ssize_t res = 0;
51 {
52 CurrentMetrics::Increment metric_increment{CurrentMetrics::Write};
53 res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
54 }
55
56 if ((-1 == res || 0 == res) && errno != EINTR)
57 {
58 ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteFailed);
59 throwFromErrnoWithPath("Cannot write to file " + getFileName(), getFileName(),
60 ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);
61 }
62
63 if (res > 0)
64 bytes_written += res;
65 }
66
67 ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds());
68 ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written);
69}
70
71
72/// Name or some description of file.
73std::string WriteBufferFromFileDescriptor::getFileName() const
74{
75 return "(fd = " + toString(fd) + ")";
76}
77
78
79WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor(
80 int fd_,
81 size_t buf_size,
82 char * existing_memory,
83 size_t alignment)
84 : WriteBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_) {}
85
86
87WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
88{
89 try
90 {
91 if (fd >= 0)
92 next();
93 }
94 catch (...)
95 {
96 tryLogCurrentException(__PRETTY_FUNCTION__);
97 }
98}
99
100
101off_t WriteBufferFromFileDescriptor::getPositionInFile()
102{
103 return seek(0, SEEK_CUR);
104}
105
106
107void WriteBufferFromFileDescriptor::sync()
108{
109 /// If buffer has pending data - write it.
110 next();
111
112 /// Request OS to sync data with storage medium.
113 int res = fsync(fd);
114 if (-1 == res)
115 throwFromErrnoWithPath("Cannot fsync " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSYNC);
116}
117
118
119off_t WriteBufferFromFileDescriptor::doSeek(off_t offset, int whence)
120{
121 off_t res = lseek(fd, offset, whence);
122 if (-1 == res)
123 throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(),
124 ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
125 return res;
126}
127
128
129void WriteBufferFromFileDescriptor::doTruncate(off_t length)
130{
131 int res = ftruncate(fd, length);
132 if (-1 == res)
133 throwFromErrnoWithPath("Cannot truncate file " + getFileName(), getFileName(), ErrorCodes::CANNOT_TRUNCATE_FILE);
134}
135
136}
137