1 | #include <boost/program_options.hpp> |
2 | |
3 | #include <Common/hex.h> |
4 | #include <IO/ReadBuffer.h> |
5 | #include <IO/WriteBuffer.h> |
6 | #include <IO/ReadHelpers.h> |
7 | #include <IO/WriteHelpers.h> |
8 | #include <IO/ReadBufferFromFileDescriptor.h> |
9 | #include <IO/WriteBufferFromFileDescriptor.h> |
10 | |
11 | |
12 | /** Reads uncompressed wikistat data from stdin, |
13 | * and writes transformed data in tsv format, |
14 | * ready to be loaded into ClickHouse. |
15 | * |
16 | * Input data has format: |
17 | * |
18 | * aa Wikipedia 1 17224 |
19 | * aa.b Main_Page 2 21163 |
20 | * |
21 | * project, optional subproject, path, hits, total size in bytes. |
22 | */ |
23 | |
24 | |
25 | template <bool break_at_dot> |
26 | static void readString(std::string & s, DB::ReadBuffer & buf) |
27 | { |
28 | s.clear(); |
29 | |
30 | while (!buf.eof()) |
31 | { |
32 | const char * next_pos; |
33 | |
34 | if (break_at_dot) |
35 | next_pos = find_first_symbols<' ', '\n', '.'>(buf.position(), buf.buffer().end()); |
36 | else |
37 | next_pos = find_first_symbols<' ', '\n'>(buf.position(), buf.buffer().end()); |
38 | |
39 | s.append(buf.position(), next_pos - buf.position()); |
40 | buf.position() += next_pos - buf.position(); |
41 | |
42 | if (!buf.hasPendingData()) |
43 | continue; |
44 | |
45 | if (*buf.position() == ' ' || *buf.position() == '\n' || (break_at_dot && *buf.position() == '.')) |
46 | return; |
47 | } |
48 | } |
49 | |
50 | |
51 | /** Reads path before whitespace and decodes %xx sequences (to more compact and handy representation), |
52 | * except %2F '/', %26 '&', %3D '=', %3F '?', %23 '#' (to not break structure of URL). |
53 | */ |
54 | static void readPath(std::string & s, DB::ReadBuffer & buf) |
55 | { |
56 | s.clear(); |
57 | |
58 | while (!buf.eof()) |
59 | { |
60 | const char * next_pos = find_first_symbols<' ', '\n', '%'>(buf.position(), buf.buffer().end()); |
61 | |
62 | s.append(buf.position(), next_pos - buf.position()); |
63 | buf.position() += next_pos - buf.position(); |
64 | |
65 | if (!buf.hasPendingData()) |
66 | continue; |
67 | |
68 | if (*buf.position() == ' ' || *buf.position() == '\n') |
69 | return; |
70 | |
71 | if (*buf.position() == '%') |
72 | { |
73 | ++buf.position(); |
74 | |
75 | char c1; |
76 | char c2; |
77 | |
78 | if (buf.eof() || *buf.position() == ' ') |
79 | break; |
80 | |
81 | DB::readChar(c1, buf); |
82 | |
83 | if (buf.eof() || *buf.position() == ' ') |
84 | break; |
85 | |
86 | DB::readChar(c2, buf); |
87 | |
88 | if ((c1 == '2' && (c2 == 'f' || c2 == '6' || c2 == '3' || c2 == 'F')) |
89 | || (c1 == '3' && (c2 == 'd' || c2 == 'f' || c2 == 'D' || c2 == 'F'))) |
90 | { |
91 | s += '%'; |
92 | s += c1; |
93 | s += c2; |
94 | } |
95 | else |
96 | s += static_cast<char>(static_cast<UInt8>(unhex(c1)) * 16 + static_cast<UInt8>(unhex(c2))); |
97 | } |
98 | } |
99 | } |
100 | |
101 | |
102 | static void skipUntilNewline(DB::ReadBuffer & buf) |
103 | { |
104 | while (!buf.eof()) |
105 | { |
106 | const char * next_pos = find_first_symbols<'\n'>(buf.position(), buf.buffer().end()); |
107 | |
108 | buf.position() += next_pos - buf.position(); |
109 | |
110 | if (!buf.hasPendingData()) |
111 | continue; |
112 | |
113 | if (*buf.position() == '\n') |
114 | { |
115 | ++buf.position(); |
116 | return; |
117 | } |
118 | } |
119 | } |
120 | |
121 | |
122 | namespace DB |
123 | { |
124 | namespace ErrorCodes |
125 | { |
126 | extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED; |
127 | } |
128 | } |
129 | |
130 | |
131 | int main(int argc, char ** argv) |
132 | try |
133 | { |
134 | boost::program_options::options_description desc("Allowed options" ); |
135 | desc.add_options() |
136 | ("help,h" , "produce help message" ) |
137 | ("time" , boost::program_options::value<std::string>()->required(), |
138 | "time of data in YYYY-MM-DD hh:mm:ss form" ) |
139 | ; |
140 | |
141 | boost::program_options::variables_map options; |
142 | boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); |
143 | |
144 | if (options.count("help" )) |
145 | { |
146 | std::cout << "Reads uncompressed wikistat data from stdin and writes transformed data in tsv format." << std::endl; |
147 | std::cout << "Usage: " << argv[0] << " --time='YYYY-MM-DD hh:00:00' < in > out" << std::endl; |
148 | std::cout << desc << std::endl; |
149 | return 1; |
150 | } |
151 | |
152 | std::string time_str = options.at("time" ).as<std::string>(); |
153 | LocalDateTime time(time_str); |
154 | LocalDate date(time); |
155 | |
156 | DB::ReadBufferFromFileDescriptor in(STDIN_FILENO); |
157 | DB::WriteBufferFromFileDescriptor out(STDOUT_FILENO); |
158 | |
159 | std::string project; |
160 | std::string subproject; |
161 | std::string path; |
162 | UInt64 hits = 0; |
163 | UInt64 size = 0; |
164 | |
165 | size_t row_num = 0; |
166 | while (!in.eof()) |
167 | { |
168 | try |
169 | { |
170 | ++row_num; |
171 | readString<true>(project, in); |
172 | |
173 | if (in.eof()) |
174 | break; |
175 | |
176 | if (*in.position() == '.') |
177 | readString<false>(subproject, in); |
178 | else |
179 | subproject.clear(); |
180 | |
181 | DB::assertChar(' ', in); |
182 | readPath(path, in); |
183 | DB::assertChar(' ', in); |
184 | DB::readIntText(hits, in); |
185 | DB::assertChar(' ', in); |
186 | DB::readIntText(size, in); |
187 | DB::assertChar('\n', in); |
188 | } |
189 | catch (const DB::Exception & e) |
190 | { |
191 | /// Sometimes, input data has errors. For example, look at first lines in pagecounts-20130210-130000.gz |
192 | /// To save rest of data, just skip lines with errors. |
193 | if (e.code() == DB::ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED) |
194 | { |
195 | std::cerr << "At row " << row_num << ": " << DB::getCurrentExceptionMessage(false) << '\n'; |
196 | skipUntilNewline(in); |
197 | continue; |
198 | } |
199 | else |
200 | throw; |
201 | } |
202 | |
203 | DB::writeText(date, out); |
204 | DB::writeChar('\t', out); |
205 | DB::writeText(time, out); |
206 | DB::writeChar('\t', out); |
207 | DB::writeText(project, out); |
208 | DB::writeChar('\t', out); |
209 | DB::writeText(subproject, out); |
210 | DB::writeChar('\t', out); |
211 | DB::writeText(path, out); |
212 | DB::writeChar('\t', out); |
213 | DB::writeText(hits, out); |
214 | DB::writeChar('\t', out); |
215 | DB::writeText(size, out); |
216 | DB::writeChar('\n', out); |
217 | } |
218 | |
219 | return 0; |
220 | } |
221 | catch (...) |
222 | { |
223 | std::cerr << DB::getCurrentExceptionMessage(true) << '\n'; |
224 | throw; |
225 | } |
226 | |