1 | #include <string.h> |
2 | #include <random> |
3 | #include <pcg_random.hpp> |
4 | #include <common/likely.h> |
5 | #include <common/Types.h> |
6 | |
7 | #include <IO/ReadBuffer.h> |
8 | #include <IO/ReadBufferFromFileDescriptor.h> |
9 | #include <IO/WriteBufferFromFileDescriptor.h> |
10 | #include <IO/BufferWithOwnMemory.h> |
11 | #include <Compression/CompressionInfo.h> |
12 | #include <IO/WriteHelpers.h> |
13 | #include <IO/copyData.h> |
14 | #include <Common/PODArray.h> |
15 | |
16 | /** Quick and dirty implementation of data scrambler. |
17 | * |
18 | * The task is to replace the data with pseudorandom values. |
19 | * But with keeping some probability distributions |
20 | * and with maintaining the same compression ratio. |
21 | * |
22 | * The solution is to operate directly on compressed LZ4 stream. |
23 | * The stream consists of independent compressed blocks. |
24 | * Each block is a stream of "literals" and "matches". |
25 | * Liteal is an instruction to literally put some following bytes, |
26 | * and match is an instruction to copy some bytes that was already seen before. |
27 | * |
28 | * We get literals and apply some scramble operation on it. |
29 | * But we keep literal length and matches without changes. |
30 | * |
31 | * That's how we get pseudorandom data but with keeping |
32 | * all repetitive patterns and maintaining the same compression ratio. |
33 | * |
34 | * Actually, the compression ratio, if you decompress scrambled data and compress again |
35 | * become slightly worse, because LZ4 use simple match finder based on value of hash function, |
36 | * and it can find different matches due to collisions in hash function. |
37 | * |
38 | * Scramble operation replace literals with pseudorandom bytes, |
39 | * but with some heuristics to keep some sort of data structure. |
40 | * |
41 | * It's in question, is it scramble data enough and while is it safe to publish scrambled data. |
42 | * In general, you should assume that it is not safe. |
43 | */ |
44 | |
45 | |
46 | #define ML_BITS 4 |
47 | #define ML_MASK ((1U<<ML_BITS)-1) |
48 | #define RUN_BITS (8-ML_BITS) |
49 | #define RUN_MASK ((1U<<RUN_BITS)-1) |
50 | |
51 | #define MINMATCH 4 |
52 | #define WILDCOPYLENGTH 8 |
53 | #define LASTLITERALS 5 |
54 | |
55 | |
56 | static UInt8 rand(pcg64 & generator, UInt8 min, UInt8 max) |
57 | { |
58 | return min + generator() % (max + 1 - min); |
59 | } |
60 | |
61 | static void mutate(pcg64 & generator, void * src, size_t length) |
62 | { |
63 | UInt8 * pos = static_cast<UInt8 *>(src); |
64 | UInt8 * end = pos + length; |
65 | |
66 | while (pos < end) |
67 | { |
68 | if (pos + strlen("https" ) <= end && 0 == memcmp(pos, "https" , strlen("https" ))) |
69 | { |
70 | pos += strlen("https" ); |
71 | continue; |
72 | } |
73 | |
74 | if (pos + strlen("http" ) <= end && 0 == memcmp(pos, "http" , strlen("http" ))) |
75 | { |
76 | pos += strlen("http" ); |
77 | continue; |
78 | } |
79 | |
80 | if (pos + strlen("www" ) <= end && 0 == memcmp(pos, "www" , strlen("www" ))) |
81 | { |
82 | pos += strlen("www" ); |
83 | continue; |
84 | } |
85 | |
86 | if (*pos >= '1' && *pos <= '9') |
87 | *pos = rand(generator, '1', '9'); |
88 | else if (*pos >= 'a' && *pos <= 'z') |
89 | *pos = rand(generator, 'a', 'z'); |
90 | else if (*pos >= 'A' && *pos <= 'Z') |
91 | *pos = rand(generator, 'A', 'Z'); |
92 | else if (*pos >= 0x80 && *pos <= 0xBF) |
93 | *pos = rand(generator, *pos & 0xF0U, *pos | 0x0FU); |
94 | else if (*pos == '\\') |
95 | ++pos; |
96 | |
97 | ++pos; |
98 | } |
99 | |
100 | pos = static_cast<UInt8 *>(src); |
101 | while (pos < end) |
102 | { |
103 | if (pos + 3 <= end |
104 | && isAlphaASCII(pos[0]) |
105 | && !isAlphaASCII(pos[1]) && pos[1] != '\\' && pos[1] >= 0x20 |
106 | && isAlphaASCII(pos[2])) |
107 | { |
108 | auto res = rand(generator, 0, 3); |
109 | if (res == 2) |
110 | { |
111 | std::swap(pos[0], pos[1]); |
112 | } |
113 | else if (res == 3) |
114 | std::swap(pos[1], pos[2]); |
115 | |
116 | pos += 3; |
117 | } |
118 | else if (pos + 5 <= end |
119 | && pos[0] >= 0xC0 && pos[0] <= 0xDF && pos[1] >= 0x80 && pos[1] <= 0xBF |
120 | && pos[2] >= 0x20 && pos[2] < 0x80 && !isAlphaASCII(pos[2]) |
121 | && pos[3] >= 0xC0 && pos[3] <= 0xDF && pos[4] >= 0x80 && pos[4] <= 0xBF) |
122 | { |
123 | auto res = rand(generator, 0, 3); |
124 | if (res == 2) |
125 | { |
126 | std::swap(pos[1], pos[2]); |
127 | std::swap(pos[0], pos[1]); |
128 | } |
129 | else if (res == 3) |
130 | { |
131 | std::swap(pos[3], pos[2]); |
132 | std::swap(pos[4], pos[3]); |
133 | } |
134 | |
135 | pos += 5; |
136 | } |
137 | else |
138 | ++pos; |
139 | } |
140 | } |
141 | |
142 | |
143 | static void LZ4_copy8(void* dst, const void* src) |
144 | { |
145 | memcpy(dst,src,8); |
146 | } |
147 | |
148 | /* customized variant of memcpy, which can overwrite up to 8 bytes beyond dstEnd */ |
149 | static void LZ4_wildCopy(void* dstPtr, const void* srcPtr, void* dstEnd) |
150 | { |
151 | UInt8* d = (UInt8*)dstPtr; |
152 | const UInt8* s = (const UInt8*)srcPtr; |
153 | UInt8* const e = (UInt8*)dstEnd; |
154 | |
155 | do { LZ4_copy8(d,s); d+=8; s+=8; } while (d<e); |
156 | } |
157 | |
158 | |
159 | static UInt16 LZ4_read16(const void* memPtr) |
160 | { |
161 | UInt16 val; memcpy(&val, memPtr, sizeof(val)); return val; |
162 | } |
163 | |
164 | |
165 | static void LZ4_write32(void* memPtr, UInt32 value) |
166 | { |
167 | memcpy(memPtr, &value, sizeof(value)); |
168 | } |
169 | |
170 | |
171 | int LZ4_decompress_mutate( |
172 | char* const source, |
173 | char* const dest, |
174 | int outputSize) |
175 | { |
176 | pcg64 generator; |
177 | |
178 | /* Local Variables */ |
179 | UInt8* ip = (UInt8*) source; |
180 | |
181 | UInt8* op = (UInt8*) dest; |
182 | UInt8* const oend = op + outputSize; |
183 | UInt8* cpy; |
184 | |
185 | const unsigned dec32table[] = {0, 1, 2, 1, 4, 4, 4, 4}; |
186 | const int dec64table[] = {0, 0, 0, -1, 0, 1, 2, 3}; |
187 | |
188 | /* Main Loop : decode sequences */ |
189 | while (1) { |
190 | size_t length; |
191 | const UInt8* match; |
192 | size_t offset; |
193 | |
194 | /* get literal length */ |
195 | unsigned const token = *ip++; |
196 | if ((length=(token>>ML_BITS)) == RUN_MASK) { |
197 | unsigned s; |
198 | do { |
199 | s = *ip++; |
200 | length += s; |
201 | } while (s==255); |
202 | } |
203 | |
204 | /* copy literals */ |
205 | cpy = op+length; |
206 | if (cpy>oend-WILDCOPYLENGTH) |
207 | { |
208 | if (cpy != oend) goto _output_error; /* Error : block decoding must stop exactly there */ |
209 | mutate(generator, ip, length); |
210 | memcpy(op, ip, length); |
211 | ip += length; |
212 | op += length; |
213 | break; /* Necessarily EOF, due to parsing restrictions */ |
214 | } |
215 | mutate(generator, ip, cpy - op); |
216 | LZ4_wildCopy(op, ip, cpy); |
217 | ip += length; op = cpy; |
218 | |
219 | /* get offset */ |
220 | offset = LZ4_read16(ip); ip+=2; |
221 | match = op - offset; |
222 | LZ4_write32(op, (UInt32)offset); /* costs ~1%; silence an msan warning when offset==0 */ |
223 | |
224 | /* get matchlength */ |
225 | length = token & ML_MASK; |
226 | if (length == ML_MASK) { |
227 | unsigned s; |
228 | do { |
229 | s = *ip++; |
230 | length += s; |
231 | } while (s==255); |
232 | } |
233 | length += MINMATCH; |
234 | |
235 | /* copy match within block */ |
236 | cpy = op + length; |
237 | if (unlikely(offset<8)) { |
238 | const int dec64 = dec64table[offset]; |
239 | op[0] = match[0]; |
240 | op[1] = match[1]; |
241 | op[2] = match[2]; |
242 | op[3] = match[3]; |
243 | match += dec32table[offset]; |
244 | memcpy(op+4, match, 4); |
245 | match -= dec64; |
246 | } else { LZ4_copy8(op, match); match+=8; } |
247 | op += 8; |
248 | |
249 | if (unlikely(cpy>oend-12)) { |
250 | UInt8* const oCopyLimit = oend-(WILDCOPYLENGTH-1); |
251 | if (cpy > oend-LASTLITERALS) goto _output_error; /* Error : last LASTLITERALS bytes must be literals (uncompressed) */ |
252 | if (op < oCopyLimit) { |
253 | LZ4_wildCopy(op, match, oCopyLimit); |
254 | match += oCopyLimit - op; |
255 | op = oCopyLimit; |
256 | } |
257 | while (op<cpy) *op++ = *match++; |
258 | } else { |
259 | LZ4_copy8(op, match); |
260 | if (length>16) LZ4_wildCopy(op+8, match+8, cpy); |
261 | } |
262 | op=cpy; /* correction */ |
263 | } |
264 | |
265 | return (int) (((const char*)ip)-source); /* Nb of input bytes read */ |
266 | |
267 | /* Overflow error detected */ |
268 | _output_error: |
269 | return (int) (-(((const char*)ip)-source))-1; |
270 | } |
271 | |
272 | |
273 | namespace DB |
274 | { |
275 | |
276 | namespace ErrorCodes |
277 | { |
278 | extern const int UNKNOWN_COMPRESSION_METHOD; |
279 | extern const int TOO_LARGE_SIZE_COMPRESSED; |
280 | extern const int CANNOT_DECOMPRESS; |
281 | } |
282 | |
283 | class MutatingCompressedReadBufferBase |
284 | { |
285 | protected: |
286 | ReadBuffer * compressed_in; |
287 | |
288 | /// If 'compressed_in' buffer has whole compressed block - then use it. Otherwise copy parts of data to 'own_compressed_buffer'. |
289 | PODArray<char> own_compressed_buffer; |
290 | /// Points to memory, holding compressed block. |
291 | char * compressed_buffer = nullptr; |
292 | |
293 | size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum) |
294 | { |
295 | if (compressed_in->eof()) |
296 | return 0; |
297 | |
298 | CityHash_v1_0_2::uint128 checksum; |
299 | compressed_in->readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum)); |
300 | |
301 | own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE); |
302 | compressed_in->readStrict(&own_compressed_buffer[0], COMPRESSED_BLOCK_HEADER_SIZE); |
303 | |
304 | UInt8 method = own_compressed_buffer[0]; /// See CompressedWriteBuffer.h |
305 | |
306 | size_t & size_compressed = size_compressed_without_checksum; |
307 | |
308 | if (method == static_cast<UInt8>(CompressionMethodByte::LZ4) || |
309 | method == static_cast<UInt8>(CompressionMethodByte::ZSTD) || |
310 | method == static_cast<UInt8>(CompressionMethodByte::NONE)) |
311 | { |
312 | size_compressed = unalignedLoad<UInt32>(&own_compressed_buffer[1]); |
313 | size_decompressed = unalignedLoad<UInt32>(&own_compressed_buffer[5]); |
314 | } |
315 | else |
316 | throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD); |
317 | |
318 | if (size_compressed > DBMS_MAX_COMPRESSED_SIZE) |
319 | throw Exception("Too large size_compressed. Most likely corrupted data." , ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); |
320 | |
321 | /// Is whole compressed block located in 'compressed_in' buffer? |
322 | if (compressed_in->offset() >= COMPRESSED_BLOCK_HEADER_SIZE && |
323 | compressed_in->position() + size_compressed - COMPRESSED_BLOCK_HEADER_SIZE <= compressed_in->buffer().end()) |
324 | { |
325 | compressed_in->position() -= COMPRESSED_BLOCK_HEADER_SIZE; |
326 | compressed_buffer = compressed_in->position(); |
327 | compressed_in->position() += size_compressed; |
328 | } |
329 | else |
330 | { |
331 | own_compressed_buffer.resize(size_compressed); |
332 | compressed_buffer = &own_compressed_buffer[0]; |
333 | compressed_in->readStrict(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, size_compressed - COMPRESSED_BLOCK_HEADER_SIZE); |
334 | } |
335 | |
336 | return size_compressed + sizeof(checksum); |
337 | } |
338 | |
339 | void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum) |
340 | { |
341 | UInt8 method = compressed_buffer[0]; /// See CompressedWriteBuffer.h |
342 | |
343 | if (method == static_cast<UInt8>(CompressionMethodByte::LZ4)) |
344 | { |
345 | if (LZ4_decompress_mutate(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, to, size_decompressed) < 0) |
346 | throw Exception("Cannot LZ4_decompress_fast" , ErrorCodes::CANNOT_DECOMPRESS); |
347 | } |
348 | else |
349 | throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD); |
350 | } |
351 | |
352 | public: |
353 | /// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'. |
354 | MutatingCompressedReadBufferBase(ReadBuffer * in = nullptr) |
355 | : compressed_in(in), own_compressed_buffer(COMPRESSED_BLOCK_HEADER_SIZE) |
356 | { |
357 | } |
358 | }; |
359 | |
360 | |
361 | class MutatingCompressedReadBuffer : public MutatingCompressedReadBufferBase, public BufferWithOwnMemory<ReadBuffer> |
362 | { |
363 | private: |
364 | size_t size_compressed = 0; |
365 | |
366 | bool nextImpl() override |
367 | { |
368 | size_t size_decompressed; |
369 | size_t size_compressed_without_checksum; |
370 | size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum); |
371 | if (!size_compressed) |
372 | return false; |
373 | |
374 | memory.resize(size_decompressed); |
375 | working_buffer = Buffer(&memory[0], &memory[size_decompressed]); |
376 | |
377 | decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum); |
378 | |
379 | return true; |
380 | } |
381 | |
382 | public: |
383 | MutatingCompressedReadBuffer(ReadBuffer & in_) |
384 | : MutatingCompressedReadBufferBase(&in_), BufferWithOwnMemory<ReadBuffer>(0) |
385 | { |
386 | } |
387 | }; |
388 | |
389 | } |
390 | |
391 | |
392 | int main(int, char **) |
393 | try |
394 | { |
395 | DB::ReadBufferFromFileDescriptor in(STDIN_FILENO); |
396 | DB::MutatingCompressedReadBuffer mutating_in(in); |
397 | DB::WriteBufferFromFileDescriptor out(STDOUT_FILENO); |
398 | |
399 | DB::copyData(mutating_in, out); |
400 | |
401 | return 0; |
402 | } |
403 | catch (...) |
404 | { |
405 | std::cerr << DB::getCurrentExceptionMessage(true); |
406 | return DB::getCurrentExceptionCode(); |
407 | } |
408 | |