1#include <boost/program_options.hpp>
2
3#include <Common/hex.h>
4#include <IO/ReadBuffer.h>
5#include <IO/WriteBuffer.h>
6#include <IO/ReadHelpers.h>
7#include <IO/WriteHelpers.h>
8#include <IO/ReadBufferFromFileDescriptor.h>
9#include <IO/WriteBufferFromFileDescriptor.h>
10
11
12/** Reads uncompressed wikistat data from stdin,
13 * and writes transformed data in tsv format,
14 * ready to be loaded into ClickHouse.
15 *
16 * Input data has format:
17 *
18 * aa Wikipedia 1 17224
19 * aa.b Main_Page 2 21163
20 *
21 * project, optional subproject, path, hits, total size in bytes.
22 */
23
24
25template <bool break_at_dot>
26static void readString(std::string & s, DB::ReadBuffer & buf)
27{
28 s.clear();
29
30 while (!buf.eof())
31 {
32 const char * next_pos;
33
34 if (break_at_dot)
35 next_pos = find_first_symbols<' ', '\n', '.'>(buf.position(), buf.buffer().end());
36 else
37 next_pos = find_first_symbols<' ', '\n'>(buf.position(), buf.buffer().end());
38
39 s.append(buf.position(), next_pos - buf.position());
40 buf.position() += next_pos - buf.position();
41
42 if (!buf.hasPendingData())
43 continue;
44
45 if (*buf.position() == ' ' || *buf.position() == '\n' || (break_at_dot && *buf.position() == '.'))
46 return;
47 }
48}
49
50
51/** Reads path before whitespace and decodes %xx sequences (to more compact and handy representation),
52 * except %2F '/', %26 '&', %3D '=', %3F '?', %23 '#' (to not break structure of URL).
53 */
54static void readPath(std::string & s, DB::ReadBuffer & buf)
55{
56 s.clear();
57
58 while (!buf.eof())
59 {
60 const char * next_pos = find_first_symbols<' ', '\n', '%'>(buf.position(), buf.buffer().end());
61
62 s.append(buf.position(), next_pos - buf.position());
63 buf.position() += next_pos - buf.position();
64
65 if (!buf.hasPendingData())
66 continue;
67
68 if (*buf.position() == ' ' || *buf.position() == '\n')
69 return;
70
71 if (*buf.position() == '%')
72 {
73 ++buf.position();
74
75 char c1;
76 char c2;
77
78 if (buf.eof() || *buf.position() == ' ')
79 break;
80
81 DB::readChar(c1, buf);
82
83 if (buf.eof() || *buf.position() == ' ')
84 break;
85
86 DB::readChar(c2, buf);
87
88 if ((c1 == '2' && (c2 == 'f' || c2 == '6' || c2 == '3' || c2 == 'F'))
89 || (c1 == '3' && (c2 == 'd' || c2 == 'f' || c2 == 'D' || c2 == 'F')))
90 {
91 s += '%';
92 s += c1;
93 s += c2;
94 }
95 else
96 s += static_cast<char>(static_cast<UInt8>(unhex(c1)) * 16 + static_cast<UInt8>(unhex(c2)));
97 }
98 }
99}
100
101
102static void skipUntilNewline(DB::ReadBuffer & buf)
103{
104 while (!buf.eof())
105 {
106 const char * next_pos = find_first_symbols<'\n'>(buf.position(), buf.buffer().end());
107
108 buf.position() += next_pos - buf.position();
109
110 if (!buf.hasPendingData())
111 continue;
112
113 if (*buf.position() == '\n')
114 {
115 ++buf.position();
116 return;
117 }
118 }
119}
120
121
122namespace DB
123{
124 namespace ErrorCodes
125 {
126 extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
127 }
128}
129
130
131int main(int argc, char ** argv)
132try
133{
134 boost::program_options::options_description desc("Allowed options");
135 desc.add_options()
136 ("help,h", "produce help message")
137 ("time", boost::program_options::value<std::string>()->required(),
138 "time of data in YYYY-MM-DD hh:mm:ss form")
139 ;
140
141 boost::program_options::variables_map options;
142 boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
143
144 if (options.count("help"))
145 {
146 std::cout << "Reads uncompressed wikistat data from stdin and writes transformed data in tsv format." << std::endl;
147 std::cout << "Usage: " << argv[0] << " --time='YYYY-MM-DD hh:00:00' < in > out" << std::endl;
148 std::cout << desc << std::endl;
149 return 1;
150 }
151
152 std::string time_str = options.at("time").as<std::string>();
153 LocalDateTime time(time_str);
154 LocalDate date(time);
155
156 DB::ReadBufferFromFileDescriptor in(STDIN_FILENO);
157 DB::WriteBufferFromFileDescriptor out(STDOUT_FILENO);
158
159 std::string project;
160 std::string subproject;
161 std::string path;
162 UInt64 hits = 0;
163 UInt64 size = 0;
164
165 size_t row_num = 0;
166 while (!in.eof())
167 {
168 try
169 {
170 ++row_num;
171 readString<true>(project, in);
172
173 if (in.eof())
174 break;
175
176 if (*in.position() == '.')
177 readString<false>(subproject, in);
178 else
179 subproject.clear();
180
181 DB::assertChar(' ', in);
182 readPath(path, in);
183 DB::assertChar(' ', in);
184 DB::readIntText(hits, in);
185 DB::assertChar(' ', in);
186 DB::readIntText(size, in);
187 DB::assertChar('\n', in);
188 }
189 catch (const DB::Exception & e)
190 {
191 /// Sometimes, input data has errors. For example, look at first lines in pagecounts-20130210-130000.gz
192 /// To save rest of data, just skip lines with errors.
193 if (e.code() == DB::ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED)
194 {
195 std::cerr << "At row " << row_num << ": " << DB::getCurrentExceptionMessage(false) << '\n';
196 skipUntilNewline(in);
197 continue;
198 }
199 else
200 throw;
201 }
202
203 DB::writeText(date, out);
204 DB::writeChar('\t', out);
205 DB::writeText(time, out);
206 DB::writeChar('\t', out);
207 DB::writeText(project, out);
208 DB::writeChar('\t', out);
209 DB::writeText(subproject, out);
210 DB::writeChar('\t', out);
211 DB::writeText(path, out);
212 DB::writeChar('\t', out);
213 DB::writeText(hits, out);
214 DB::writeChar('\t', out);
215 DB::writeText(size, out);
216 DB::writeChar('\n', out);
217 }
218
219 return 0;
220}
221catch (...)
222{
223 std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
224 throw;
225}
226