| 1 | #include <boost/program_options.hpp> |
| 2 | |
| 3 | #include <Common/hex.h> |
| 4 | #include <IO/ReadBuffer.h> |
| 5 | #include <IO/WriteBuffer.h> |
| 6 | #include <IO/ReadHelpers.h> |
| 7 | #include <IO/WriteHelpers.h> |
| 8 | #include <IO/ReadBufferFromFileDescriptor.h> |
| 9 | #include <IO/WriteBufferFromFileDescriptor.h> |
| 10 | |
| 11 | |
| 12 | /** Reads uncompressed wikistat data from stdin, |
| 13 | * and writes transformed data in tsv format, |
| 14 | * ready to be loaded into ClickHouse. |
| 15 | * |
| 16 | * Input data has format: |
| 17 | * |
| 18 | * aa Wikipedia 1 17224 |
| 19 | * aa.b Main_Page 2 21163 |
| 20 | * |
| 21 | * project, optional subproject, path, hits, total size in bytes. |
| 22 | */ |
| 23 | |
| 24 | |
| 25 | template <bool break_at_dot> |
| 26 | static void readString(std::string & s, DB::ReadBuffer & buf) |
| 27 | { |
| 28 | s.clear(); |
| 29 | |
| 30 | while (!buf.eof()) |
| 31 | { |
| 32 | const char * next_pos; |
| 33 | |
| 34 | if (break_at_dot) |
| 35 | next_pos = find_first_symbols<' ', '\n', '.'>(buf.position(), buf.buffer().end()); |
| 36 | else |
| 37 | next_pos = find_first_symbols<' ', '\n'>(buf.position(), buf.buffer().end()); |
| 38 | |
| 39 | s.append(buf.position(), next_pos - buf.position()); |
| 40 | buf.position() += next_pos - buf.position(); |
| 41 | |
| 42 | if (!buf.hasPendingData()) |
| 43 | continue; |
| 44 | |
| 45 | if (*buf.position() == ' ' || *buf.position() == '\n' || (break_at_dot && *buf.position() == '.')) |
| 46 | return; |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | |
| 51 | /** Reads path before whitespace and decodes %xx sequences (to more compact and handy representation), |
| 52 | * except %2F '/', %26 '&', %3D '=', %3F '?', %23 '#' (to not break structure of URL). |
| 53 | */ |
| 54 | static void readPath(std::string & s, DB::ReadBuffer & buf) |
| 55 | { |
| 56 | s.clear(); |
| 57 | |
| 58 | while (!buf.eof()) |
| 59 | { |
| 60 | const char * next_pos = find_first_symbols<' ', '\n', '%'>(buf.position(), buf.buffer().end()); |
| 61 | |
| 62 | s.append(buf.position(), next_pos - buf.position()); |
| 63 | buf.position() += next_pos - buf.position(); |
| 64 | |
| 65 | if (!buf.hasPendingData()) |
| 66 | continue; |
| 67 | |
| 68 | if (*buf.position() == ' ' || *buf.position() == '\n') |
| 69 | return; |
| 70 | |
| 71 | if (*buf.position() == '%') |
| 72 | { |
| 73 | ++buf.position(); |
| 74 | |
| 75 | char c1; |
| 76 | char c2; |
| 77 | |
| 78 | if (buf.eof() || *buf.position() == ' ') |
| 79 | break; |
| 80 | |
| 81 | DB::readChar(c1, buf); |
| 82 | |
| 83 | if (buf.eof() || *buf.position() == ' ') |
| 84 | break; |
| 85 | |
| 86 | DB::readChar(c2, buf); |
| 87 | |
| 88 | if ((c1 == '2' && (c2 == 'f' || c2 == '6' || c2 == '3' || c2 == 'F')) |
| 89 | || (c1 == '3' && (c2 == 'd' || c2 == 'f' || c2 == 'D' || c2 == 'F'))) |
| 90 | { |
| 91 | s += '%'; |
| 92 | s += c1; |
| 93 | s += c2; |
| 94 | } |
| 95 | else |
| 96 | s += static_cast<char>(static_cast<UInt8>(unhex(c1)) * 16 + static_cast<UInt8>(unhex(c2))); |
| 97 | } |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | |
| 102 | static void skipUntilNewline(DB::ReadBuffer & buf) |
| 103 | { |
| 104 | while (!buf.eof()) |
| 105 | { |
| 106 | const char * next_pos = find_first_symbols<'\n'>(buf.position(), buf.buffer().end()); |
| 107 | |
| 108 | buf.position() += next_pos - buf.position(); |
| 109 | |
| 110 | if (!buf.hasPendingData()) |
| 111 | continue; |
| 112 | |
| 113 | if (*buf.position() == '\n') |
| 114 | { |
| 115 | ++buf.position(); |
| 116 | return; |
| 117 | } |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | |
| 122 | namespace DB |
| 123 | { |
| 124 | namespace ErrorCodes |
| 125 | { |
| 126 | extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED; |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | |
| 131 | int main(int argc, char ** argv) |
| 132 | try |
| 133 | { |
| 134 | boost::program_options::options_description desc("Allowed options" ); |
| 135 | desc.add_options() |
| 136 | ("help,h" , "produce help message" ) |
| 137 | ("time" , boost::program_options::value<std::string>()->required(), |
| 138 | "time of data in YYYY-MM-DD hh:mm:ss form" ) |
| 139 | ; |
| 140 | |
| 141 | boost::program_options::variables_map options; |
| 142 | boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); |
| 143 | |
| 144 | if (options.count("help" )) |
| 145 | { |
| 146 | std::cout << "Reads uncompressed wikistat data from stdin and writes transformed data in tsv format." << std::endl; |
| 147 | std::cout << "Usage: " << argv[0] << " --time='YYYY-MM-DD hh:00:00' < in > out" << std::endl; |
| 148 | std::cout << desc << std::endl; |
| 149 | return 1; |
| 150 | } |
| 151 | |
| 152 | std::string time_str = options.at("time" ).as<std::string>(); |
| 153 | LocalDateTime time(time_str); |
| 154 | LocalDate date(time); |
| 155 | |
| 156 | DB::ReadBufferFromFileDescriptor in(STDIN_FILENO); |
| 157 | DB::WriteBufferFromFileDescriptor out(STDOUT_FILENO); |
| 158 | |
| 159 | std::string project; |
| 160 | std::string subproject; |
| 161 | std::string path; |
| 162 | UInt64 hits = 0; |
| 163 | UInt64 size = 0; |
| 164 | |
| 165 | size_t row_num = 0; |
| 166 | while (!in.eof()) |
| 167 | { |
| 168 | try |
| 169 | { |
| 170 | ++row_num; |
| 171 | readString<true>(project, in); |
| 172 | |
| 173 | if (in.eof()) |
| 174 | break; |
| 175 | |
| 176 | if (*in.position() == '.') |
| 177 | readString<false>(subproject, in); |
| 178 | else |
| 179 | subproject.clear(); |
| 180 | |
| 181 | DB::assertChar(' ', in); |
| 182 | readPath(path, in); |
| 183 | DB::assertChar(' ', in); |
| 184 | DB::readIntText(hits, in); |
| 185 | DB::assertChar(' ', in); |
| 186 | DB::readIntText(size, in); |
| 187 | DB::assertChar('\n', in); |
| 188 | } |
| 189 | catch (const DB::Exception & e) |
| 190 | { |
| 191 | /// Sometimes, input data has errors. For example, look at first lines in pagecounts-20130210-130000.gz |
| 192 | /// To save rest of data, just skip lines with errors. |
| 193 | if (e.code() == DB::ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED) |
| 194 | { |
| 195 | std::cerr << "At row " << row_num << ": " << DB::getCurrentExceptionMessage(false) << '\n'; |
| 196 | skipUntilNewline(in); |
| 197 | continue; |
| 198 | } |
| 199 | else |
| 200 | throw; |
| 201 | } |
| 202 | |
| 203 | DB::writeText(date, out); |
| 204 | DB::writeChar('\t', out); |
| 205 | DB::writeText(time, out); |
| 206 | DB::writeChar('\t', out); |
| 207 | DB::writeText(project, out); |
| 208 | DB::writeChar('\t', out); |
| 209 | DB::writeText(subproject, out); |
| 210 | DB::writeChar('\t', out); |
| 211 | DB::writeText(path, out); |
| 212 | DB::writeChar('\t', out); |
| 213 | DB::writeText(hits, out); |
| 214 | DB::writeChar('\t', out); |
| 215 | DB::writeText(size, out); |
| 216 | DB::writeChar('\n', out); |
| 217 | } |
| 218 | |
| 219 | return 0; |
| 220 | } |
| 221 | catch (...) |
| 222 | { |
| 223 | std::cerr << DB::getCurrentExceptionMessage(true) << '\n'; |
| 224 | throw; |
| 225 | } |
| 226 | |