1 | #pragma once |
2 | |
3 | #include <fcntl.h> |
4 | #include <sys/file.h> |
5 | |
6 | #include <string> |
7 | #include <iostream> |
8 | |
9 | #include <Poco/File.h> |
10 | #include <Poco/Exception.h> |
11 | #include <mutex> |
12 | |
13 | #include <Common/Exception.h> |
14 | #include <IO/ReadBufferFromFileDescriptor.h> |
15 | #include <IO/WriteBufferFromFileDescriptor.h> |
16 | #include <IO/ReadHelpers.h> |
17 | #include <IO/WriteHelpers.h> |
18 | |
19 | #include <common/Types.h> |
20 | |
21 | namespace DB |
22 | { |
23 | namespace ErrorCodes |
24 | { |
25 | extern const int CANNOT_OPEN_FILE; |
26 | extern const int CANNOT_READ_ALL_DATA; |
27 | extern const int ATTEMPT_TO_READ_AFTER_EOF; |
28 | } |
29 | } |
30 | |
31 | |
32 | /** Stores a number in the file. |
33 | * Designed for rare calls (not designed for performance). |
34 | */ |
35 | class CounterInFile |
36 | { |
37 | private: |
38 | static inline constexpr size_t SMALL_READ_WRITE_BUFFER_SIZE = 16; |
39 | |
40 | public: |
41 | /// path - the name of the file, including the path |
42 | CounterInFile(const std::string & path_) : path(path_) {} |
43 | |
44 | /** Add `delta` to the number in the file and return the new value. |
45 | * If the `create_if_need` parameter is not set to true, then |
46 | * the file should already have a number written (if not - create the file manually with zero). |
47 | * |
48 | * To protect against race conditions between different processes, file locks are used. |
49 | * (But when the first file is created, the race condition is possible, so it's better to create the file in advance.) |
50 | * |
51 | * `locked_callback` is called when the counter file is locked. A new value is passed to it. |
52 | * `locked_callback` can be used to do something atomically with incrementing the counter (for example, renaming files). |
53 | */ |
54 | template <typename Callback> |
55 | Int64 add(Int64 delta, Callback && locked_callback, bool create_if_need = false) |
56 | { |
57 | std::lock_guard lock(mutex); |
58 | |
59 | Int64 res = -1; |
60 | |
61 | bool file_doesnt_exists = !Poco::File(path).exists(); |
62 | if (file_doesnt_exists && !create_if_need) |
63 | { |
64 | throw Poco::Exception("File " + path + " does not exist. " |
65 | "You must create it manulally with appropriate value or 0 for first start." ); |
66 | } |
67 | |
68 | int fd = ::open(path.c_str(), O_RDWR | O_CREAT, 0666); |
69 | if (-1 == fd) |
70 | DB::throwFromErrnoWithPath("Cannot open file " + path, path, DB::ErrorCodes::CANNOT_OPEN_FILE); |
71 | |
72 | try |
73 | { |
74 | int flock_ret = flock(fd, LOCK_EX); |
75 | if (-1 == flock_ret) |
76 | DB::throwFromErrnoWithPath("Cannot lock file " + path, path, DB::ErrorCodes::CANNOT_OPEN_FILE); |
77 | |
78 | if (!file_doesnt_exists) |
79 | { |
80 | DB::ReadBufferFromFileDescriptor rb(fd, SMALL_READ_WRITE_BUFFER_SIZE); |
81 | try |
82 | { |
83 | DB::readIntText(res, rb); |
84 | } |
85 | catch (const DB::Exception & e) |
86 | { |
87 | /// A more understandable error message. |
88 | if (e.code() == DB::ErrorCodes::CANNOT_READ_ALL_DATA || e.code() == DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF) |
89 | throw DB::Exception("File " + path + " is empty. You must fill it manually with appropriate value." , e.code()); |
90 | else |
91 | throw; |
92 | } |
93 | } |
94 | else |
95 | res = 0; |
96 | |
97 | if (delta || file_doesnt_exists) |
98 | { |
99 | res += delta; |
100 | |
101 | DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE); |
102 | wb.seek(0); |
103 | wb.truncate(); |
104 | DB::writeIntText(res, wb); |
105 | DB::writeChar('\n', wb); |
106 | wb.sync(); |
107 | } |
108 | |
109 | locked_callback(res); |
110 | } |
111 | catch (...) |
112 | { |
113 | close(fd); |
114 | throw; |
115 | } |
116 | |
117 | close(fd); |
118 | return res; |
119 | } |
120 | |
121 | Int64 add(Int64 delta, bool create_if_need = false) |
122 | { |
123 | return add(delta, [](UInt64){}, create_if_need); |
124 | } |
125 | |
126 | const std::string & getPath() const |
127 | { |
128 | return path; |
129 | } |
130 | |
131 | /// Change the path to the file. |
132 | void setPath(std::string path_) |
133 | { |
134 | path = path_; |
135 | } |
136 | |
137 | // Not thread-safe and not synchronized between processes. |
138 | void fixIfBroken(UInt64 value) |
139 | { |
140 | bool file_exists = Poco::File(path).exists(); |
141 | |
142 | int fd = ::open(path.c_str(), O_RDWR | O_CREAT, 0666); |
143 | if (-1 == fd) |
144 | DB::throwFromErrnoWithPath("Cannot open file " + path, path, DB::ErrorCodes::CANNOT_OPEN_FILE); |
145 | |
146 | try |
147 | { |
148 | bool broken = true; |
149 | |
150 | if (file_exists) |
151 | { |
152 | DB::ReadBufferFromFileDescriptor rb(fd, SMALL_READ_WRITE_BUFFER_SIZE); |
153 | try |
154 | { |
155 | UInt64 current_value; |
156 | DB::readIntText(current_value, rb); |
157 | char c; |
158 | DB::readChar(c, rb); |
159 | if (rb.count() > 0 && c == '\n' && rb.eof()) |
160 | broken = false; |
161 | } |
162 | catch (const DB::Exception & e) |
163 | { |
164 | if (e.code() != DB::ErrorCodes::CANNOT_READ_ALL_DATA && e.code() != DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF) |
165 | throw; |
166 | } |
167 | } |
168 | |
169 | if (broken) |
170 | { |
171 | DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE); |
172 | wb.seek(0); |
173 | wb.truncate(); |
174 | DB::writeIntText(value, wb); |
175 | DB::writeChar('\n', wb); |
176 | wb.sync(); |
177 | } |
178 | } |
179 | catch (...) |
180 | { |
181 | close(fd); |
182 | throw; |
183 | } |
184 | |
185 | close(fd); |
186 | } |
187 | |
188 | private: |
189 | std::string path; |
190 | std::mutex mutex; |
191 | }; |
192 | |