1#include <string.h>
2#include <random>
3#include <pcg_random.hpp>
4#include <common/likely.h>
5#include <common/Types.h>
6
7#include <IO/ReadBuffer.h>
8#include <IO/ReadBufferFromFileDescriptor.h>
9#include <IO/WriteBufferFromFileDescriptor.h>
10#include <IO/BufferWithOwnMemory.h>
11#include <Compression/CompressionInfo.h>
12#include <IO/WriteHelpers.h>
13#include <IO/copyData.h>
14#include <Common/PODArray.h>
15
16/** Quick and dirty implementation of data scrambler.
17 *
18 * The task is to replace the data with pseudorandom values.
19 * But with keeping some probability distributions
20 * and with maintaining the same compression ratio.
21 *
22 * The solution is to operate directly on compressed LZ4 stream.
23 * The stream consists of independent compressed blocks.
24 * Each block is a stream of "literals" and "matches".
25 * Liteal is an instruction to literally put some following bytes,
26 * and match is an instruction to copy some bytes that was already seen before.
27 *
28 * We get literals and apply some scramble operation on it.
29 * But we keep literal length and matches without changes.
30 *
31 * That's how we get pseudorandom data but with keeping
32 * all repetitive patterns and maintaining the same compression ratio.
33 *
34 * Actually, the compression ratio, if you decompress scrambled data and compress again
35 * become slightly worse, because LZ4 use simple match finder based on value of hash function,
36 * and it can find different matches due to collisions in hash function.
37 *
38 * Scramble operation replace literals with pseudorandom bytes,
39 * but with some heuristics to keep some sort of data structure.
40 *
41 * It's in question, is it scramble data enough and while is it safe to publish scrambled data.
42 * In general, you should assume that it is not safe.
43 */
44
45
46#define ML_BITS 4
47#define ML_MASK ((1U<<ML_BITS)-1)
48#define RUN_BITS (8-ML_BITS)
49#define RUN_MASK ((1U<<RUN_BITS)-1)
50
51#define MINMATCH 4
52#define WILDCOPYLENGTH 8
53#define LASTLITERALS 5
54
55
56static UInt8 rand(pcg64 & generator, UInt8 min, UInt8 max)
57{
58 return min + generator() % (max + 1 - min);
59}
60
61static void mutate(pcg64 & generator, void * src, size_t length)
62{
63 UInt8 * pos = static_cast<UInt8 *>(src);
64 UInt8 * end = pos + length;
65
66 while (pos < end)
67 {
68 if (pos + strlen("https") <= end && 0 == memcmp(pos, "https", strlen("https")))
69 {
70 pos += strlen("https");
71 continue;
72 }
73
74 if (pos + strlen("http") <= end && 0 == memcmp(pos, "http", strlen("http")))
75 {
76 pos += strlen("http");
77 continue;
78 }
79
80 if (pos + strlen("www") <= end && 0 == memcmp(pos, "www", strlen("www")))
81 {
82 pos += strlen("www");
83 continue;
84 }
85
86 if (*pos >= '1' && *pos <= '9')
87 *pos = rand(generator, '1', '9');
88 else if (*pos >= 'a' && *pos <= 'z')
89 *pos = rand(generator, 'a', 'z');
90 else if (*pos >= 'A' && *pos <= 'Z')
91 *pos = rand(generator, 'A', 'Z');
92 else if (*pos >= 0x80 && *pos <= 0xBF)
93 *pos = rand(generator, *pos & 0xF0U, *pos | 0x0FU);
94 else if (*pos == '\\')
95 ++pos;
96
97 ++pos;
98 }
99
100 pos = static_cast<UInt8 *>(src);
101 while (pos < end)
102 {
103 if (pos + 3 <= end
104 && isAlphaASCII(pos[0])
105 && !isAlphaASCII(pos[1]) && pos[1] != '\\' && pos[1] >= 0x20
106 && isAlphaASCII(pos[2]))
107 {
108 auto res = rand(generator, 0, 3);
109 if (res == 2)
110 {
111 std::swap(pos[0], pos[1]);
112 }
113 else if (res == 3)
114 std::swap(pos[1], pos[2]);
115
116 pos += 3;
117 }
118 else if (pos + 5 <= end
119 && pos[0] >= 0xC0 && pos[0] <= 0xDF && pos[1] >= 0x80 && pos[1] <= 0xBF
120 && pos[2] >= 0x20 && pos[2] < 0x80 && !isAlphaASCII(pos[2])
121 && pos[3] >= 0xC0 && pos[3] <= 0xDF && pos[4] >= 0x80 && pos[4] <= 0xBF)
122 {
123 auto res = rand(generator, 0, 3);
124 if (res == 2)
125 {
126 std::swap(pos[1], pos[2]);
127 std::swap(pos[0], pos[1]);
128 }
129 else if (res == 3)
130 {
131 std::swap(pos[3], pos[2]);
132 std::swap(pos[4], pos[3]);
133 }
134
135 pos += 5;
136 }
137 else
138 ++pos;
139 }
140}
141
142
143static void LZ4_copy8(void* dst, const void* src)
144{
145 memcpy(dst,src,8);
146}
147
148/* customized variant of memcpy, which can overwrite up to 8 bytes beyond dstEnd */
149static void LZ4_wildCopy(void* dstPtr, const void* srcPtr, void* dstEnd)
150{
151 UInt8* d = (UInt8*)dstPtr;
152 const UInt8* s = (const UInt8*)srcPtr;
153 UInt8* const e = (UInt8*)dstEnd;
154
155 do { LZ4_copy8(d,s); d+=8; s+=8; } while (d<e);
156}
157
158
159static UInt16 LZ4_read16(const void* memPtr)
160{
161 UInt16 val; memcpy(&val, memPtr, sizeof(val)); return val;
162}
163
164
165static void LZ4_write32(void* memPtr, UInt32 value)
166{
167 memcpy(memPtr, &value, sizeof(value));
168}
169
170
171int LZ4_decompress_mutate(
172 char* const source,
173 char* const dest,
174 int outputSize)
175{
176 pcg64 generator;
177
178 /* Local Variables */
179 UInt8* ip = (UInt8*) source;
180
181 UInt8* op = (UInt8*) dest;
182 UInt8* const oend = op + outputSize;
183 UInt8* cpy;
184
185 const unsigned dec32table[] = {0, 1, 2, 1, 4, 4, 4, 4};
186 const int dec64table[] = {0, 0, 0, -1, 0, 1, 2, 3};
187
188 /* Main Loop : decode sequences */
189 while (1) {
190 size_t length;
191 const UInt8* match;
192 size_t offset;
193
194 /* get literal length */
195 unsigned const token = *ip++;
196 if ((length=(token>>ML_BITS)) == RUN_MASK) {
197 unsigned s;
198 do {
199 s = *ip++;
200 length += s;
201 } while (s==255);
202 }
203
204 /* copy literals */
205 cpy = op+length;
206 if (cpy>oend-WILDCOPYLENGTH)
207 {
208 if (cpy != oend) goto _output_error; /* Error : block decoding must stop exactly there */
209 mutate(generator, ip, length);
210 memcpy(op, ip, length);
211 ip += length;
212 op += length;
213 break; /* Necessarily EOF, due to parsing restrictions */
214 }
215 mutate(generator, ip, cpy - op);
216 LZ4_wildCopy(op, ip, cpy);
217 ip += length; op = cpy;
218
219 /* get offset */
220 offset = LZ4_read16(ip); ip+=2;
221 match = op - offset;
222 LZ4_write32(op, (UInt32)offset); /* costs ~1%; silence an msan warning when offset==0 */
223
224 /* get matchlength */
225 length = token & ML_MASK;
226 if (length == ML_MASK) {
227 unsigned s;
228 do {
229 s = *ip++;
230 length += s;
231 } while (s==255);
232 }
233 length += MINMATCH;
234
235 /* copy match within block */
236 cpy = op + length;
237 if (unlikely(offset<8)) {
238 const int dec64 = dec64table[offset];
239 op[0] = match[0];
240 op[1] = match[1];
241 op[2] = match[2];
242 op[3] = match[3];
243 match += dec32table[offset];
244 memcpy(op+4, match, 4);
245 match -= dec64;
246 } else { LZ4_copy8(op, match); match+=8; }
247 op += 8;
248
249 if (unlikely(cpy>oend-12)) {
250 UInt8* const oCopyLimit = oend-(WILDCOPYLENGTH-1);
251 if (cpy > oend-LASTLITERALS) goto _output_error; /* Error : last LASTLITERALS bytes must be literals (uncompressed) */
252 if (op < oCopyLimit) {
253 LZ4_wildCopy(op, match, oCopyLimit);
254 match += oCopyLimit - op;
255 op = oCopyLimit;
256 }
257 while (op<cpy) *op++ = *match++;
258 } else {
259 LZ4_copy8(op, match);
260 if (length>16) LZ4_wildCopy(op+8, match+8, cpy);
261 }
262 op=cpy; /* correction */
263 }
264
265 return (int) (((const char*)ip)-source); /* Nb of input bytes read */
266
267 /* Overflow error detected */
268_output_error:
269 return (int) (-(((const char*)ip)-source))-1;
270}
271
272
273namespace DB
274{
275
276namespace ErrorCodes
277{
278 extern const int UNKNOWN_COMPRESSION_METHOD;
279 extern const int TOO_LARGE_SIZE_COMPRESSED;
280 extern const int CANNOT_DECOMPRESS;
281}
282
283class MutatingCompressedReadBufferBase
284{
285protected:
286 ReadBuffer * compressed_in;
287
288 /// If 'compressed_in' buffer has whole compressed block - then use it. Otherwise copy parts of data to 'own_compressed_buffer'.
289 PODArray<char> own_compressed_buffer;
290 /// Points to memory, holding compressed block.
291 char * compressed_buffer = nullptr;
292
293 size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum)
294 {
295 if (compressed_in->eof())
296 return 0;
297
298 CityHash_v1_0_2::uint128 checksum;
299 compressed_in->readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum));
300
301 own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE);
302 compressed_in->readStrict(&own_compressed_buffer[0], COMPRESSED_BLOCK_HEADER_SIZE);
303
304 UInt8 method = own_compressed_buffer[0]; /// See CompressedWriteBuffer.h
305
306 size_t & size_compressed = size_compressed_without_checksum;
307
308 if (method == static_cast<UInt8>(CompressionMethodByte::LZ4) ||
309 method == static_cast<UInt8>(CompressionMethodByte::ZSTD) ||
310 method == static_cast<UInt8>(CompressionMethodByte::NONE))
311 {
312 size_compressed = unalignedLoad<UInt32>(&own_compressed_buffer[1]);
313 size_decompressed = unalignedLoad<UInt32>(&own_compressed_buffer[5]);
314 }
315 else
316 throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
317
318 if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
319 throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
320
321 /// Is whole compressed block located in 'compressed_in' buffer?
322 if (compressed_in->offset() >= COMPRESSED_BLOCK_HEADER_SIZE &&
323 compressed_in->position() + size_compressed - COMPRESSED_BLOCK_HEADER_SIZE <= compressed_in->buffer().end())
324 {
325 compressed_in->position() -= COMPRESSED_BLOCK_HEADER_SIZE;
326 compressed_buffer = compressed_in->position();
327 compressed_in->position() += size_compressed;
328 }
329 else
330 {
331 own_compressed_buffer.resize(size_compressed);
332 compressed_buffer = &own_compressed_buffer[0];
333 compressed_in->readStrict(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
334 }
335
336 return size_compressed + sizeof(checksum);
337 }
338
339 void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
340 {
341 UInt8 method = compressed_buffer[0]; /// See CompressedWriteBuffer.h
342
343 if (method == static_cast<UInt8>(CompressionMethodByte::LZ4))
344 {
345 if (LZ4_decompress_mutate(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, to, size_decompressed) < 0)
346 throw Exception("Cannot LZ4_decompress_fast", ErrorCodes::CANNOT_DECOMPRESS);
347 }
348 else
349 throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
350 }
351
352public:
353 /// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
354 MutatingCompressedReadBufferBase(ReadBuffer * in = nullptr)
355 : compressed_in(in), own_compressed_buffer(COMPRESSED_BLOCK_HEADER_SIZE)
356 {
357 }
358};
359
360
361class MutatingCompressedReadBuffer : public MutatingCompressedReadBufferBase, public BufferWithOwnMemory<ReadBuffer>
362{
363private:
364 size_t size_compressed = 0;
365
366 bool nextImpl() override
367 {
368 size_t size_decompressed;
369 size_t size_compressed_without_checksum;
370 size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
371 if (!size_compressed)
372 return false;
373
374 memory.resize(size_decompressed);
375 working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
376
377 decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
378
379 return true;
380 }
381
382public:
383 MutatingCompressedReadBuffer(ReadBuffer & in_)
384 : MutatingCompressedReadBufferBase(&in_), BufferWithOwnMemory<ReadBuffer>(0)
385 {
386 }
387};
388
389}
390
391
392int main(int, char **)
393try
394{
395 DB::ReadBufferFromFileDescriptor in(STDIN_FILENO);
396 DB::MutatingCompressedReadBuffer mutating_in(in);
397 DB::WriteBufferFromFileDescriptor out(STDOUT_FILENO);
398
399 DB::copyData(mutating_in, out);
400
401 return 0;
402}
403catch (...)
404{
405 std::cerr << DB::getCurrentExceptionMessage(true);
406 return DB::getCurrentExceptionCode();
407}
408