1#include <Common/PipeFDs.h>
2#include <Common/Exception.h>
3#include <Common/formatReadable.h>
4
5#include <common/logger_useful.h>
6
7#include <unistd.h>
8#include <fcntl.h>
9#include <string>
10#include <algorithm>
11
12
13namespace DB
14{
15
16namespace ErrorCodes
17{
18 extern const int CANNOT_PIPE;
19 extern const int CANNOT_FCNTL;
20 extern const int LOGICAL_ERROR;
21}
22
23void LazyPipeFDs::open()
24{
25 for (int & fd : fds_rw)
26 if (fd >= 0)
27 throw Exception("Pipe is already opened", ErrorCodes::LOGICAL_ERROR);
28
29#ifndef __APPLE__
30 if (0 != pipe2(fds_rw, O_CLOEXEC))
31 throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE);
32#else
33 if (0 != pipe(fds_rw))
34 throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_PIPE);
35 if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
36 throwFromErrno("Cannot setup auto-close on exec for read end of pipe", ErrorCodes::CANNOT_FCNTL);
37 if (0 != fcntl(fds_rw[1], F_SETFD, FD_CLOEXEC))
38 throwFromErrno("Cannot setup auto-close on exec for write end of pipe", ErrorCodes::CANNOT_FCNTL);
39#endif
40}
41
42void LazyPipeFDs::close()
43{
44 for (int & fd : fds_rw)
45 {
46 if (fd < 0)
47 continue;
48 if (0 != ::close(fd))
49 throwFromErrno("Cannot close pipe", ErrorCodes::CANNOT_PIPE);
50 fd = -1;
51 }
52}
53
54PipeFDs::PipeFDs()
55{
56 open();
57}
58
59LazyPipeFDs::~LazyPipeFDs()
60{
61 try
62 {
63 close();
64 }
65 catch (...)
66 {
67 tryLogCurrentException(__PRETTY_FUNCTION__);
68 }
69}
70
71
72void LazyPipeFDs::setNonBlocking()
73{
74 int flags = fcntl(fds_rw[1], F_GETFL, 0);
75 if (-1 == flags)
76 throwFromErrno("Cannot get file status flags of pipe", ErrorCodes::CANNOT_FCNTL);
77 if (-1 == fcntl(fds_rw[1], F_SETFL, flags | O_NONBLOCK))
78 throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
79}
80
81void LazyPipeFDs::tryIncreaseSize(int desired_size)
82{
83#if defined(OS_LINUX)
84 Poco::Logger * log = &Poco::Logger::get("Pipe");
85
86 /** Increase pipe size to avoid slowdown during fine-grained trace collection.
87 */
88 int pipe_size = fcntl(fds_rw[1], F_GETPIPE_SZ);
89 if (-1 == pipe_size)
90 {
91 if (errno == EINVAL)
92 {
93 LOG_INFO(log, "Cannot get pipe capacity, " << errnoToString(ErrorCodes::CANNOT_FCNTL) << ". Very old Linux kernels have no support for this fcntl.");
94 /// It will work nevertheless.
95 }
96 else
97 throwFromErrno("Cannot get pipe capacity", ErrorCodes::CANNOT_FCNTL);
98 }
99 else
100 {
101 for (errno = 0; errno != EPERM && pipe_size < desired_size; pipe_size *= 2)
102 if (-1 == fcntl(fds_rw[1], F_SETPIPE_SZ, pipe_size * 2) && errno != EPERM)
103 throwFromErrno("Cannot increase pipe capacity to " + std::to_string(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
104
105 LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, desired_size)));
106 }
107#else
108 (void)desired_size;
109#endif
110}
111
112}
113