1#pragma once
2
3#include <fcntl.h>
4#include <sys/file.h>
5
6#include <string>
7#include <iostream>
8
9#include <Poco/File.h>
10#include <Poco/Exception.h>
11#include <mutex>
12
13#include <Common/Exception.h>
14#include <IO/ReadBufferFromFileDescriptor.h>
15#include <IO/WriteBufferFromFileDescriptor.h>
16#include <IO/ReadHelpers.h>
17#include <IO/WriteHelpers.h>
18
19#include <common/Types.h>
20
21namespace DB
22{
23 namespace ErrorCodes
24 {
25 extern const int CANNOT_OPEN_FILE;
26 extern const int CANNOT_READ_ALL_DATA;
27 extern const int ATTEMPT_TO_READ_AFTER_EOF;
28 }
29}
30
31
32/** Stores a number in the file.
33 * Designed for rare calls (not designed for performance).
34 */
35class CounterInFile
36{
37private:
38 static inline constexpr size_t SMALL_READ_WRITE_BUFFER_SIZE = 16;
39
40public:
41 /// path - the name of the file, including the path
42 CounterInFile(const std::string & path_) : path(path_) {}
43
44 /** Add `delta` to the number in the file and return the new value.
45 * If the `create_if_need` parameter is not set to true, then
46 * the file should already have a number written (if not - create the file manually with zero).
47 *
48 * To protect against race conditions between different processes, file locks are used.
49 * (But when the first file is created, the race condition is possible, so it's better to create the file in advance.)
50 *
51 * `locked_callback` is called when the counter file is locked. A new value is passed to it.
52 * `locked_callback` can be used to do something atomically with incrementing the counter (for example, renaming files).
53 */
54 template <typename Callback>
55 Int64 add(Int64 delta, Callback && locked_callback, bool create_if_need = false)
56 {
57 std::lock_guard lock(mutex);
58
59 Int64 res = -1;
60
61 bool file_doesnt_exists = !Poco::File(path).exists();
62 if (file_doesnt_exists && !create_if_need)
63 {
64 throw Poco::Exception("File " + path + " does not exist. "
65 "You must create it manulally with appropriate value or 0 for first start.");
66 }
67
68 int fd = ::open(path.c_str(), O_RDWR | O_CREAT, 0666);
69 if (-1 == fd)
70 DB::throwFromErrnoWithPath("Cannot open file " + path, path, DB::ErrorCodes::CANNOT_OPEN_FILE);
71
72 try
73 {
74 int flock_ret = flock(fd, LOCK_EX);
75 if (-1 == flock_ret)
76 DB::throwFromErrnoWithPath("Cannot lock file " + path, path, DB::ErrorCodes::CANNOT_OPEN_FILE);
77
78 if (!file_doesnt_exists)
79 {
80 DB::ReadBufferFromFileDescriptor rb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
81 try
82 {
83 DB::readIntText(res, rb);
84 }
85 catch (const DB::Exception & e)
86 {
87 /// A more understandable error message.
88 if (e.code() == DB::ErrorCodes::CANNOT_READ_ALL_DATA || e.code() == DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
89 throw DB::Exception("File " + path + " is empty. You must fill it manually with appropriate value.", e.code());
90 else
91 throw;
92 }
93 }
94 else
95 res = 0;
96
97 if (delta || file_doesnt_exists)
98 {
99 res += delta;
100
101 DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
102 wb.seek(0);
103 wb.truncate();
104 DB::writeIntText(res, wb);
105 DB::writeChar('\n', wb);
106 wb.sync();
107 }
108
109 locked_callback(res);
110 }
111 catch (...)
112 {
113 close(fd);
114 throw;
115 }
116
117 close(fd);
118 return res;
119 }
120
121 Int64 add(Int64 delta, bool create_if_need = false)
122 {
123 return add(delta, [](UInt64){}, create_if_need);
124 }
125
126 const std::string & getPath() const
127 {
128 return path;
129 }
130
131 /// Change the path to the file.
132 void setPath(std::string path_)
133 {
134 path = path_;
135 }
136
137 // Not thread-safe and not synchronized between processes.
138 void fixIfBroken(UInt64 value)
139 {
140 bool file_exists = Poco::File(path).exists();
141
142 int fd = ::open(path.c_str(), O_RDWR | O_CREAT, 0666);
143 if (-1 == fd)
144 DB::throwFromErrnoWithPath("Cannot open file " + path, path, DB::ErrorCodes::CANNOT_OPEN_FILE);
145
146 try
147 {
148 bool broken = true;
149
150 if (file_exists)
151 {
152 DB::ReadBufferFromFileDescriptor rb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
153 try
154 {
155 UInt64 current_value;
156 DB::readIntText(current_value, rb);
157 char c;
158 DB::readChar(c, rb);
159 if (rb.count() > 0 && c == '\n' && rb.eof())
160 broken = false;
161 }
162 catch (const DB::Exception & e)
163 {
164 if (e.code() != DB::ErrorCodes::CANNOT_READ_ALL_DATA && e.code() != DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
165 throw;
166 }
167 }
168
169 if (broken)
170 {
171 DB::WriteBufferFromFileDescriptor wb(fd, SMALL_READ_WRITE_BUFFER_SIZE);
172 wb.seek(0);
173 wb.truncate();
174 DB::writeIntText(value, wb);
175 DB::writeChar('\n', wb);
176 wb.sync();
177 }
178 }
179 catch (...)
180 {
181 close(fd);
182 throw;
183 }
184
185 close(fd);
186 }
187
188private:
189 std::string path;
190 std::mutex mutex;
191};
192