1#include <iostream>
2#include <optional>
3#include <boost/program_options.hpp>
4#include <boost/algorithm/string/join.hpp>
5
6#include <Common/Exception.h>
7#include <IO/WriteBufferFromFileDescriptor.h>
8#include <IO/ReadBufferFromFileDescriptor.h>
9#include <Compression/CompressedWriteBuffer.h>
10#include <Compression/CompressedReadBuffer.h>
11#include <IO/WriteHelpers.h>
12#include <IO/copyData.h>
13#include <Parsers/parseQuery.h>
14#include <Parsers/ExpressionElementParsers.h>
15#include <Compression/CompressionFactory.h>
16#include <Common/TerminalSize.h>
17
18
19namespace DB
20{
21 namespace ErrorCodes
22 {
23 extern const int TOO_LARGE_SIZE_COMPRESSED;
24 extern const int BAD_ARGUMENTS;
25 }
26}
27
28
29namespace
30{
31
32/// Outputs sizes of uncompressed and compressed blocks for compressed file.
33void checkAndWriteHeader(DB::ReadBuffer & in, DB::WriteBuffer & out)
34{
35 while (!in.eof())
36 {
37 in.ignore(16); /// checksum
38
39 char header[COMPRESSED_BLOCK_HEADER_SIZE];
40 in.readStrict(header, COMPRESSED_BLOCK_HEADER_SIZE);
41
42 UInt32 size_compressed = unalignedLoad<UInt32>(&header[1]);
43
44 if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
45 throw DB::Exception("Too large size_compressed. Most likely corrupted data.", DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
46
47 UInt32 size_decompressed = unalignedLoad<UInt32>(&header[5]);
48
49 DB::writeText(size_decompressed, out);
50 DB::writeChar('\t', out);
51 DB::writeText(size_compressed, out);
52 DB::writeChar('\n', out);
53
54 in.ignore(size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
55 }
56}
57
58}
59
60#pragma GCC diagnostic ignored "-Wunused-function"
61#pragma GCC diagnostic ignored "-Wmissing-declarations"
62
63int mainEntryClickHouseCompressor(int argc, char ** argv)
64{
65 boost::program_options::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
66 desc.add_options()
67 ("help,h", "produce help message")
68 ("decompress,d", "decompress")
69 ("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
70 ("hc", "use LZ4HC instead of LZ4")
71 ("zstd", "use ZSTD instead of LZ4")
72 ("codec", boost::program_options::value<std::vector<std::string>>()->multitoken(), "use codecs combination instead of LZ4")
73 ("level", boost::program_options::value<int>(), "compression level for codecs spicified via flags")
74 ("none", "use no compression instead of LZ4")
75 ("stat", "print block statistics of compressed data")
76 ;
77
78 boost::program_options::variables_map options;
79 boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
80
81 if (options.count("help"))
82 {
83 std::cout << "Usage: " << argv[0] << " [options] < in > out" << std::endl;
84 std::cout << desc << std::endl;
85 return 1;
86 }
87
88 try
89 {
90 bool decompress = options.count("decompress");
91 bool use_lz4hc = options.count("hc");
92 bool use_zstd = options.count("zstd");
93 bool stat_mode = options.count("stat");
94 bool use_none = options.count("none");
95 unsigned block_size = options["block-size"].as<unsigned>();
96 std::vector<std::string> codecs;
97 if (options.count("codec"))
98 codecs = options["codec"].as<std::vector<std::string>>();
99
100 if ((use_lz4hc || use_zstd || use_none) && !codecs.empty())
101 throw DB::Exception("Wrong options, codec flags like --zstd and --codec options are mutually exclusive", DB::ErrorCodes::BAD_ARGUMENTS);
102
103 if (!codecs.empty() && options.count("level"))
104 throw DB::Exception("Wrong options, --level is not compatible with --codec list", DB::ErrorCodes::BAD_ARGUMENTS);
105
106 std::string method_family = "LZ4";
107
108 if (use_lz4hc)
109 method_family = "LZ4HC";
110 else if (use_zstd)
111 method_family = "ZSTD";
112 else if (use_none)
113 method_family = "NONE";
114
115 std::optional<int> level = std::nullopt;
116 if (options.count("level"))
117 level = options["level"].as<int>();
118
119
120 DB::CompressionCodecPtr codec;
121 if (!codecs.empty())
122 {
123 DB::ParserCodec codec_parser;
124
125 std::string codecs_line = boost::algorithm::join(codecs, ",");
126 auto ast = DB::parseQuery(codec_parser, "(" + codecs_line + ")", 0);
127 codec = DB::CompressionCodecFactory::instance().get(ast, nullptr);
128 }
129 else
130 codec = DB::CompressionCodecFactory::instance().get(method_family, level);
131
132
133 DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO);
134 DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO);
135
136 if (stat_mode)
137 {
138 /// Output statistic for compressed file.
139 checkAndWriteHeader(rb, wb);
140 }
141 else if (decompress)
142 {
143 /// Decompression
144 DB::CompressedReadBuffer from(rb);
145 DB::copyData(from, wb);
146 }
147 else
148 {
149 /// Compression
150 DB::CompressedWriteBuffer to(wb, codec, block_size);
151 DB::copyData(rb, to);
152 }
153 }
154 catch (...)
155 {
156 std::cerr << DB::getCurrentExceptionMessage(true);
157 return DB::getCurrentExceptionCode();
158 }
159
160 return 0;
161}
162