1#include <Columns/IColumn.h>
2#include <Columns/ColumnVector.h>
3#include <Columns/ColumnString.h>
4#include <Columns/ColumnArray.h>
5#include <Columns/ColumnNullable.h>
6#include <Columns/ColumnFixedString.h>
7#include <DataTypes/IDataType.h>
8#include <DataTypes/DataTypesNumber.h>
9#include <DataTypes/DataTypeDate.h>
10#include <DataTypes/DataTypeDateTime.h>
11#include <DataTypes/DataTypeString.h>
12#include <DataTypes/DataTypeFixedString.h>
13#include <DataTypes/DataTypeArray.h>
14#include <DataTypes/DataTypeNullable.h>
15#include <DataTypes/DataTypeFactory.h>
16#include <Interpreters/Context.h>
17#include <DataStreams/IBlockInputStream.h>
18#include <DataStreams/IBlockOutputStream.h>
19#include <DataStreams/LimitBlockInputStream.h>
20#include <Common/SipHash.h>
21#include <Common/UTF8Helpers.h>
22#include <Common/StringUtils/StringUtils.h>
23#include <Common/HashTable/HashMap.h>
24#include <Common/typeid_cast.h>
25#include <Common/assert_cast.h>
26#include <Core/Block.h>
27#include <common/StringRef.h>
28#include <common/DateLUT.h>
29#include <IO/ReadBufferFromFileDescriptor.h>
30#include <IO/WriteBufferFromFileDescriptor.h>
31#include <ext/bit_cast.h>
32#include <memory>
33#include <cmath>
34#include <optional>
35#include <unistd.h>
36#include <boost/program_options/options_description.hpp>
37#include <boost/program_options.hpp>
38#include <boost/algorithm/string.hpp>
39#include <boost/container/flat_map.hpp>
40#include <Common/TerminalSize.h>
41
42
43static const char * documantation = R"(
44Simple tool for table data obfuscation.
45
46It reads input table and produces output table, that retain some properties of input, but contains different data.
47It allows to publish almost real production data for usage in benchmarks.
48
49It is designed to retain the following properties of data:
50- cardinalities of values (number of distinct values) for every column and for every tuple of columns;
51- conditional cardinalities: number of distinct values of one column under condition on value of another column;
52- probability distributions of absolute value of integers; sign of signed integers; exponent and sign for floats;
53- probability distributions of length of strings;
54- probability of zero values of numbers; empty strings and arrays, NULLs;
55- data compression ratio when compressed with LZ77 and entropy family of codecs;
56- continuity (magnitude of difference) of time values across table; continuity of floating point values.
57- date component of DateTime values;
58- UTF-8 validity of string values;
59- string values continue to look somewhat natural.
60
61Most of the properties above are viable for performance testing:
62- reading data, filtering, aggregation and sorting will work at almost the same speed
63 as on original data due to saved cardinalities, magnitudes, compression ratios, etc.
64
65It works in deterministic fashion: you define a seed value and transform is totally determined by input data and by seed.
66Some transforms are one to one and could be reversed, so you need to have large enough seed and keep it in secret.
67
68It use some cryptographic primitives to transform data, but from the cryptographic point of view,
69 it doesn't do anything properly and you should never consider the result as secure, unless you have other reasons for it.
70
71It may retain some data you don't want to publish.
72
73It always leave numbers 0, 1, -1 as is. Also it leaves dates, lengths of arrays and null flags exactly as in source data.
74For example, you have a column IsMobile in your table with values 0 and 1. In transformed data, it will have the same value.
75So, the user will be able to count exact ratio of mobile traffic.
76
77Another example, suppose you have some private data in your table, like user email and you don't want to publish any single email address.
78If your table is large enough and contain multiple different emails and there is no email that have very high frequency than all others,
79 it will perfectly anonymize all data. But if you have small amount of different values in a column, it can possibly reproduce some of them.
80And you should take care and look at exact algorithm, how this tool works, and probably fine tune some of it command line parameters.
81
82This tool works fine only with reasonable amount of data (at least 1000s of rows).
83)";
84
85
86namespace DB
87{
88
89namespace ErrorCodes
90{
91 extern const int LOGICAL_ERROR;
92 extern const int NOT_IMPLEMENTED;
93 extern const int CANNOT_SEEK_THROUGH_FILE;
94}
95
96
97/// Model is used to transform columns with source data to columns
98/// with similar by structure and by probability distributions but anonymized data.
99class IModel
100{
101public:
102 /// Call train iteratively for each block to train a model.
103 virtual void train(const IColumn & column);
104
105 /// Call finalize one time after training before generating.
106 virtual void finalize();
107
108 /// Call generate: pass source data column to obtain a column with anonymized data as a result.
109 virtual ColumnPtr generate(const IColumn & column);
110
111 virtual ~IModel() {}
112};
113
114using ModelPtr = std::unique_ptr<IModel>;
115
116
117template <typename... Ts>
118UInt64 hash(Ts... xs)
119{
120 SipHash hash;
121 (hash.update(xs), ...);
122 return hash.get64();
123}
124
125
126static UInt64 maskBits(UInt64 x, size_t num_bits)
127{
128 return x & ((1ULL << num_bits) - 1);
129}
130
131
132/// Apply Feistel network round to least significant num_bits part of x.
133static UInt64 feistelRound(UInt64 x, size_t num_bits, UInt64 seed, size_t round)
134{
135 size_t num_bits_left_half = num_bits / 2;
136 size_t num_bits_right_half = num_bits - num_bits_left_half;
137
138 UInt64 left_half = maskBits(x >> num_bits_right_half, num_bits_left_half);
139 UInt64 right_half = maskBits(x, num_bits_right_half);
140
141 UInt64 new_left_half = right_half;
142 UInt64 new_right_half = left_half ^ maskBits(hash(right_half, seed, round), num_bits_left_half);
143
144 return (new_left_half << num_bits_left_half) ^ new_right_half;
145}
146
147
148/// Apply Feistel network with num_rounds to least significant num_bits part of x.
149static UInt64 feistelNetwork(UInt64 x, size_t num_bits, UInt64 seed, size_t num_rounds = 4)
150{
151 UInt64 bits = maskBits(x, num_bits);
152 for (size_t i = 0; i < num_rounds; ++i)
153 bits = feistelRound(bits, num_bits, seed, i);
154 return (x & ~((1ULL << num_bits) - 1)) ^ bits;
155}
156
157
158/// Pseudorandom permutation within set of numbers with the same log2(x).
159static UInt64 transform(UInt64 x, UInt64 seed)
160{
161 /// Keep 0 and 1 as is.
162 if (x == 0 || x == 1)
163 return x;
164
165 /// Pseudorandom permutation of two elements.
166 if (x == 2 || x == 3)
167 return x ^ (seed & 1);
168
169 size_t num_leading_zeros = __builtin_clzll(x);
170
171 return feistelNetwork(x, 64 - num_leading_zeros - 1, seed);
172}
173
174
175class UnsignedIntegerModel : public IModel
176{
177private:
178 const UInt64 seed;
179
180public:
181 UnsignedIntegerModel(UInt64 seed_) : seed(seed_) {}
182
183 void train(const IColumn &) override {}
184 void finalize() override {}
185
186 ColumnPtr generate(const IColumn & column) override
187 {
188 MutableColumnPtr res = column.cloneEmpty();
189
190 size_t size = column.size();
191 res->reserve(size);
192
193 for (size_t i = 0; i < size; ++i)
194 res->insert(transform(column.getUInt(i), seed));
195
196 return res;
197 }
198};
199
200
201/// Keep sign and apply pseudorandom permutation after converting to unsigned as above.
202static Int64 transformSigned(Int64 x, UInt64 seed)
203{
204 if (x >= 0)
205 return transform(x, seed);
206 else
207 return -transform(-x, seed); /// It works Ok even for minimum signed number.
208}
209
210
211class SignedIntegerModel : public IModel
212{
213private:
214 const UInt64 seed;
215
216public:
217 SignedIntegerModel(UInt64 seed_) : seed(seed_) {}
218
219 void train(const IColumn &) override {}
220 void finalize() override {}
221
222 ColumnPtr generate(const IColumn & column) override
223 {
224 MutableColumnPtr res = column.cloneEmpty();
225
226 size_t size = column.size();
227 res->reserve(size);
228
229 for (size_t i = 0; i < size; ++i)
230 res->insert(transformSigned(column.getInt(i), seed));
231
232 return res;
233 }
234};
235
236
237/// Pseudorandom permutation of mantissa.
238template <typename Float>
239Float transformFloatMantissa(Float x, UInt64 seed)
240{
241 using UInt = std::conditional_t<std::is_same_v<Float, Float32>, UInt32, UInt64>;
242 constexpr size_t mantissa_num_bits = std::is_same_v<Float, Float32> ? 23 : 52;
243
244 UInt x_uint = ext::bit_cast<UInt>(x);
245 x_uint = feistelNetwork(x_uint, mantissa_num_bits, seed);
246 return ext::bit_cast<Float>(x_uint);
247}
248
249
250/// Transform difference from previous number by applying pseudorandom permutation to mantissa part of it.
251/// It allows to retain some continuity property of source data.
252template <typename Float>
253class FloatModel : public IModel
254{
255private:
256 const UInt64 seed;
257 Float src_prev_value = 0;
258 Float res_prev_value = 0;
259
260public:
261 FloatModel(UInt64 seed_) : seed(seed_) {}
262
263 void train(const IColumn &) override {}
264 void finalize() override {}
265
266 ColumnPtr generate(const IColumn & column) override
267 {
268 const auto & src_data = assert_cast<const ColumnVector<Float> &>(column).getData();
269 size_t size = src_data.size();
270
271 auto res_column = ColumnVector<Float>::create(size);
272 auto & res_data = assert_cast<ColumnVector<Float> &>(*res_column).getData();
273
274 for (size_t i = 0; i < size; ++i)
275 {
276 res_data[i] = res_prev_value + transformFloatMantissa(src_data[i] - src_prev_value, seed);
277 src_prev_value = src_data[i];
278 res_prev_value = res_data[i];
279 }
280
281 return res_column;
282 }
283};
284
285
286/// Leave all data as is. For example, it is used for columns of type Date.
287class IdentityModel : public IModel
288{
289public:
290 void train(const IColumn &) override {}
291 void finalize() override {}
292
293 ColumnPtr generate(const IColumn & column) override
294 {
295 return column.cloneResized(column.size());
296 }
297};
298
299
300/// Pseudorandom function, but keep word characters as word characters.
301static void transformFixedString(const UInt8 * src, UInt8 * dst, size_t size, UInt64 seed)
302{
303 {
304 SipHash hash;
305 hash.update(seed);
306 hash.update(reinterpret_cast<const char *>(src), size);
307 seed = hash.get64();
308 }
309
310 UInt8 * pos = dst;
311 UInt8 * end = dst + size;
312
313 size_t i = 0;
314 while (pos < end)
315 {
316 SipHash hash;
317 hash.update(seed);
318 hash.update(i);
319
320 if (size >= 16)
321 {
322 char * hash_dst = reinterpret_cast<char *>(std::min(pos, end - 16));
323 hash.get128(hash_dst);
324 }
325 else
326 {
327 char value[16];
328 hash.get128(value);
329 memcpy(dst, value, end - dst);
330 }
331
332 pos += 16;
333 ++i;
334 }
335
336 for (size_t j = 0; j < size; ++j)
337 {
338 if (isWordCharASCII(src[j]))
339 {
340 static constexpr char word_chars[] = "_01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
341 dst[j] = word_chars[dst[j] % (sizeof(word_chars) - 1)];
342 }
343 }
344}
345
346
347class FixedStringModel : public IModel
348{
349private:
350 const UInt64 seed;
351
352public:
353 FixedStringModel(UInt64 seed_) : seed(seed_) {}
354
355 void train(const IColumn &) override {}
356 void finalize() override {}
357
358 ColumnPtr generate(const IColumn & column) override
359 {
360 const ColumnFixedString & column_fixed_string = assert_cast<const ColumnFixedString &>(column);
361 const size_t string_size = column_fixed_string.getN();
362
363 const auto & src_data = column_fixed_string.getChars();
364 size_t size = column_fixed_string.size();
365
366 auto res_column = ColumnFixedString::create(string_size);
367 auto & res_data = res_column->getChars();
368
369 res_data.resize(src_data.size());
370
371 for (size_t i = 0; i < size; ++i)
372 transformFixedString(&src_data[i * string_size], &res_data[i * string_size], string_size, seed);
373
374 return res_column;
375 }
376};
377
378
379/// Leave date part as is and apply pseudorandom permutation to time difference with previous value within the same log2 class.
380class DateTimeModel : public IModel
381{
382private:
383 const UInt64 seed;
384 UInt32 src_prev_value = 0;
385 UInt32 res_prev_value = 0;
386
387 const DateLUTImpl & date_lut;
388
389public:
390 DateTimeModel(UInt64 seed_) : seed(seed_), date_lut(DateLUT::instance()) {}
391
392 void train(const IColumn &) override {}
393 void finalize() override {}
394
395 ColumnPtr generate(const IColumn & column) override
396 {
397 const auto & src_data = assert_cast<const ColumnVector<UInt32> &>(column).getData();
398 size_t size = src_data.size();
399
400 auto res_column = ColumnVector<UInt32>::create(size);
401 auto & res_data = assert_cast<ColumnVector<UInt32> &>(*res_column).getData();
402
403 for (size_t i = 0; i < size; ++i)
404 {
405 UInt32 src_datetime = src_data[i];
406 UInt32 src_date = date_lut.toDate(src_datetime);
407
408 Int32 src_diff = src_datetime - src_prev_value;
409 Int32 res_diff = transformSigned(src_diff, seed);
410
411 UInt32 new_datetime = res_prev_value + res_diff;
412 UInt32 new_time = new_datetime - date_lut.toDate(new_datetime);
413 res_data[i] = src_date + new_time;
414
415 src_prev_value = src_datetime;
416 res_prev_value = res_data[i];
417 }
418
419 return res_column;
420 }
421};
422
423
424struct MarkovModelParameters
425{
426 size_t order;
427 size_t frequency_cutoff;
428 size_t num_buckets_cutoff;
429 size_t frequency_add;
430 double frequency_desaturate;
431 size_t determinator_sliding_window_size;
432};
433
434
435/** Actually it's not an order-N model, but a mix of order-{0..N} models.
436 *
437 * We calculate code point counts for every context of 0..N previous code points.
438 * Then throw off some context with low amount of statistics.
439 *
440 * When generating data, we try to find statistics for a context of maximum order.
441 * And if not found - use context of smaller order, up to 0.
442 */
443class MarkovModel
444{
445private:
446 using CodePoint = UInt32;
447 using NGramHash = UInt32;
448
449 struct Histogram
450 {
451 UInt64 total = 0; /// Not including count_end.
452 UInt64 count_end = 0;
453 using Buckets = boost::container::flat_map<CodePoint, UInt64>;
454 Buckets buckets;
455
456 void add(CodePoint code)
457 {
458 ++total;
459 ++buckets[code];
460 }
461
462 void addEnd()
463 {
464 ++count_end;
465 }
466
467 CodePoint sample(UInt64 random, double end_multiplier) const
468 {
469 UInt64 range = total + UInt64(count_end * end_multiplier);
470 if (range == 0)
471 return END;
472
473 random %= range;
474
475 UInt64 sum = 0;
476 for (const auto & elem : buckets)
477 {
478 sum += elem.second;
479 if (sum > random)
480 return elem.first;
481 }
482
483 return END;
484 }
485 };
486
487 using Table = HashMap<NGramHash, Histogram, TrivialHash>;
488 Table table;
489
490 MarkovModelParameters params;
491
492 std::vector<CodePoint> code_points;
493
494 /// Special code point to form context before beginning of string.
495 static constexpr CodePoint BEGIN = -1;
496 /// Special code point to indicate end of string.
497 static constexpr CodePoint END = -2;
498
499
500 NGramHash hashContext(const CodePoint * begin, const CodePoint * end) const
501 {
502 return CRC32Hash()(StringRef(reinterpret_cast<const char *>(begin), (end - begin) * sizeof(CodePoint)));
503 }
504
505 /// By the way, we don't have to use actual Unicode numbers. We use just arbitrary bijective mapping.
506 CodePoint readCodePoint(const char *& pos, const char * end)
507 {
508 size_t length = UTF8::seqLength(*pos);
509
510 if (pos + length > end)
511 length = end - pos;
512 if (length > sizeof(CodePoint))
513 length = sizeof(CodePoint);
514
515 CodePoint res = 0;
516 memcpy(&res, pos, length);
517 pos += length;
518 return res;
519 }
520
521 bool writeCodePoint(CodePoint code, char *& pos, char * end)
522 {
523 size_t length
524 = (code & 0xFF000000) ? 4
525 : (code & 0xFFFF0000) ? 3
526 : (code & 0xFFFFFF00) ? 2
527 : 1;
528
529 if (pos + length > end)
530 return false;
531
532 memcpy(pos, &code, length);
533 pos += length;
534 return true;
535 }
536
537public:
538 MarkovModel(MarkovModelParameters params_)
539 : params(std::move(params_)), code_points(params.order, BEGIN) {}
540
541 void consume(const char * data, size_t size)
542 {
543 /// First 'order' number of code points are pre-filled with BEGIN.
544 code_points.resize(params.order);
545
546 const char * pos = data;
547 const char * end = data + size;
548
549 while (true)
550 {
551 const bool inside = pos < end;
552
553 CodePoint next_code_point {};
554
555 if (inside)
556 next_code_point = readCodePoint(pos, end);
557
558 for (size_t context_size = 0; context_size < params.order; ++context_size)
559 {
560 NGramHash context_hash = hashContext(code_points.data() + code_points.size() - context_size, code_points.data() + code_points.size());
561
562 if (inside)
563 table[context_hash].add(next_code_point);
564 else /// if (context_size != 0 || order == 0) /// Don't allow to break string without context (except order-0 model).
565 table[context_hash].addEnd();
566 }
567
568 if (inside)
569 code_points.push_back(next_code_point);
570 else
571 break;
572 }
573 }
574
575
576 void finalize()
577 {
578 if (params.num_buckets_cutoff)
579 {
580 for (auto & elem : table)
581 {
582 Histogram & histogram = elem.getMapped();
583
584 if (histogram.buckets.size() < params.num_buckets_cutoff)
585 {
586 histogram.buckets.clear();
587 histogram.total = 0;
588 }
589 }
590 }
591
592 if (params.frequency_cutoff)
593 {
594 for (auto & elem : table)
595 {
596 Histogram & histogram = elem.getMapped();
597 if (!histogram.total)
598 continue;
599
600 if (histogram.total + histogram.count_end < params.frequency_cutoff)
601 {
602 histogram.buckets.clear();
603 histogram.total = 0;
604 }
605 else
606 {
607 Histogram::Buckets new_buckets;
608 UInt64 erased_count = 0;
609
610 for (const auto & bucket : histogram.buckets)
611 {
612 if (bucket.second >= params.frequency_cutoff)
613 new_buckets.emplace(bucket);
614 else
615 erased_count += bucket.second;
616 }
617
618 histogram.buckets.swap(new_buckets);
619 histogram.total -= erased_count;
620 }
621 }
622 }
623
624 if (params.frequency_add)
625 {
626 for (auto & elem : table)
627 {
628 Histogram & histogram = elem.getMapped();
629 if (!histogram.total)
630 continue;
631
632 for (auto & bucket : histogram.buckets)
633 bucket.second += params.frequency_add;
634
635 histogram.count_end += params.frequency_add;
636 histogram.total += params.frequency_add * histogram.buckets.size();
637 }
638 }
639
640 if (params.frequency_desaturate)
641 {
642 for (auto & elem : table)
643 {
644 Histogram & histogram = elem.getMapped();
645 if (!histogram.total)
646 continue;
647
648 double average = histogram.total / histogram.buckets.size();
649
650 UInt64 new_total = 0;
651 for (auto & bucket : histogram.buckets)
652 {
653 bucket.second = bucket.second * (1.0 - params.frequency_desaturate) + average * params.frequency_desaturate;
654 new_total += bucket.second;
655 }
656
657 histogram.total = new_total;
658 }
659 }
660 }
661
662
663 size_t generate(char * data, size_t desired_size, size_t buffer_size,
664 UInt64 seed, const char * determinator_data, size_t determinator_size)
665 {
666 code_points.resize(params.order);
667
668 char * pos = data;
669 char * end = data + buffer_size;
670
671 while (pos < end)
672 {
673 Table::LookupResult it;
674
675 size_t context_size = params.order;
676 while (true)
677 {
678 it = table.find(hashContext(code_points.data() + code_points.size() - context_size, code_points.data() + code_points.size()));
679 if (it && it->getMapped().total + it->getMapped().count_end != 0)
680 break;
681
682 if (context_size == 0)
683 break;
684 --context_size;
685 }
686
687 if (!it)
688 throw Exception("Logical error in markov model", ErrorCodes::LOGICAL_ERROR);
689
690 size_t offset_from_begin_of_string = pos - data;
691 size_t determinator_sliding_window_size = params.determinator_sliding_window_size;
692 if (determinator_sliding_window_size > determinator_size)
693 determinator_sliding_window_size = determinator_size;
694
695 size_t determinator_sliding_window_overflow = offset_from_begin_of_string + determinator_sliding_window_size > determinator_size
696 ? offset_from_begin_of_string + determinator_sliding_window_size - determinator_size : 0;
697
698 const char * determinator_sliding_window_begin = determinator_data + offset_from_begin_of_string - determinator_sliding_window_overflow;
699
700 SipHash hash;
701 hash.update(seed);
702 hash.update(determinator_sliding_window_begin, determinator_sliding_window_size);
703 hash.update(determinator_sliding_window_overflow);
704 UInt64 determinator = hash.get64();
705
706 /// If string is greater than desired_size, increase probability of end.
707 double end_probability_multiplier = 0;
708 Int64 num_bytes_after_desired_size = (pos - data) - desired_size;
709
710 if (num_bytes_after_desired_size > 0)
711 end_probability_multiplier = std::pow(1.25, num_bytes_after_desired_size);
712
713 CodePoint code = it->getMapped().sample(determinator, end_probability_multiplier);
714
715 if (code == END)
716 break;
717
718 if (num_bytes_after_desired_size > 0)
719 {
720 /// Heuristic: break at ASCII non-alnum code point.
721 /// This allows to be close to desired_size but not break natural looking words.
722 if (code < 128 && !isAlphaNumericASCII(code))
723 break;
724 }
725
726 if (!writeCodePoint(code, pos, end))
727 break;
728
729 code_points.push_back(code);
730 }
731
732 return pos - data;
733 }
734};
735
736
737/// Generate length of strings as above.
738/// To generate content of strings, use
739/// order-N Markov model on Unicode code points,
740/// and to generate next code point use deterministic RNG
741/// determined by hash of a sliding window (default 8 bytes) of source string.
742/// This is intended to generate locally-similar strings from locally-similar sources.
743class StringModel : public IModel
744{
745private:
746 UInt64 seed;
747 MarkovModel markov_model;
748
749public:
750 StringModel(UInt64 seed_, MarkovModelParameters params_) : seed(seed_), markov_model(std::move(params_)) {}
751
752 void train(const IColumn & column) override
753 {
754 const ColumnString & column_string = assert_cast<const ColumnString &>(column);
755 size_t size = column_string.size();
756
757 for (size_t i = 0; i < size; ++i)
758 {
759 StringRef string = column_string.getDataAt(i);
760 markov_model.consume(string.data, string.size);
761 }
762 }
763
764 void finalize() override
765 {
766 markov_model.finalize();
767 }
768
769 ColumnPtr generate(const IColumn & column) override
770 {
771 const ColumnString & column_string = assert_cast<const ColumnString &>(column);
772 size_t size = column_string.size();
773
774 auto res_column = ColumnString::create();
775 res_column->reserve(size);
776
777 std::string new_string;
778 for (size_t i = 0; i < size; ++i)
779 {
780 StringRef src_string = column_string.getDataAt(i);
781 size_t desired_string_size = transform(src_string.size, seed);
782 new_string.resize(desired_string_size * 2);
783
784 size_t actual_size = 0;
785 if (desired_string_size != 0)
786 actual_size = markov_model.generate(new_string.data(), desired_string_size, new_string.size(), seed, src_string.data, src_string.size);
787
788 res_column->insertData(new_string.data(), actual_size);
789 }
790
791 return res_column;
792 }
793};
794
795
796class ArrayModel : public IModel
797{
798private:
799 ModelPtr nested_model;
800
801public:
802 ArrayModel(ModelPtr nested_model_) : nested_model(std::move(nested_model_)) {}
803
804 void train(const IColumn & column) override
805 {
806 const ColumnArray & column_array = assert_cast<const ColumnArray &>(column);
807 const IColumn & nested_column = column_array.getData();
808
809 nested_model->train(nested_column);
810 }
811
812 void finalize() override
813 {
814 nested_model->finalize();
815 }
816
817 ColumnPtr generate(const IColumn & column) override
818 {
819 const ColumnArray & column_array = assert_cast<const ColumnArray &>(column);
820 const IColumn & nested_column = column_array.getData();
821
822 ColumnPtr new_nested_column = nested_model->generate(nested_column);
823
824 return ColumnArray::create((*std::move(new_nested_column)).mutate(), (*std::move(column_array.getOffsetsPtr())).mutate());
825 }
826};
827
828
829class NullableModel : public IModel
830{
831private:
832 ModelPtr nested_model;
833
834public:
835 NullableModel(ModelPtr nested_model_) : nested_model(std::move(nested_model_)) {}
836
837 void train(const IColumn & column) override
838 {
839 const ColumnNullable & column_nullable = assert_cast<const ColumnNullable &>(column);
840 const IColumn & nested_column = column_nullable.getNestedColumn();
841
842 nested_model->train(nested_column);
843 }
844
845 void finalize() override
846 {
847 nested_model->finalize();
848 }
849
850 ColumnPtr generate(const IColumn & column) override
851 {
852 const ColumnNullable & column_nullable = assert_cast<const ColumnNullable &>(column);
853 const IColumn & nested_column = column_nullable.getNestedColumn();
854
855 ColumnPtr new_nested_column = nested_model->generate(nested_column);
856
857 return ColumnNullable::create((*std::move(new_nested_column)).mutate(), (*std::move(column_nullable.getNullMapColumnPtr())).mutate());
858 }
859};
860
861
862class ModelFactory
863{
864public:
865 ModelPtr get(const IDataType & data_type, UInt64 seed, MarkovModelParameters markov_model_params) const
866 {
867 if (isInteger(data_type))
868 {
869 if (isUnsignedInteger(data_type))
870 return std::make_unique<UnsignedIntegerModel>(seed);
871 else
872 return std::make_unique<SignedIntegerModel>(seed);
873 }
874
875 if (typeid_cast<const DataTypeFloat32 *>(&data_type))
876 return std::make_unique<FloatModel<Float32>>(seed);
877
878 if (typeid_cast<const DataTypeFloat64 *>(&data_type))
879 return std::make_unique<FloatModel<Float64>>(seed);
880
881 if (typeid_cast<const DataTypeDate *>(&data_type))
882 return std::make_unique<IdentityModel>();
883
884 if (typeid_cast<const DataTypeDateTime *>(&data_type))
885 return std::make_unique<DateTimeModel>(seed);
886
887 if (typeid_cast<const DataTypeString *>(&data_type))
888 return std::make_unique<StringModel>(seed, markov_model_params);
889
890 if (typeid_cast<const DataTypeFixedString *>(&data_type))
891 return std::make_unique<FixedStringModel>(seed);
892
893 if (auto type = typeid_cast<const DataTypeArray *>(&data_type))
894 return std::make_unique<ArrayModel>(get(*type->getNestedType(), seed, markov_model_params));
895
896 if (auto type = typeid_cast<const DataTypeNullable *>(&data_type))
897 return std::make_unique<NullableModel>(get(*type->getNestedType(), seed, markov_model_params));
898
899 throw Exception("Unsupported data type", ErrorCodes::NOT_IMPLEMENTED);
900 }
901};
902
903
904class Obfuscator
905{
906private:
907 std::vector<ModelPtr> models;
908
909public:
910 Obfuscator(const Block & header, UInt64 seed, MarkovModelParameters markov_model_params)
911 {
912 ModelFactory factory;
913
914 size_t columns = header.columns();
915 models.reserve(columns);
916
917 for (const auto & elem : header)
918 models.emplace_back(factory.get(*elem.type, hash(seed, elem.name), markov_model_params));
919 }
920
921 void train(const Columns & columns)
922 {
923 size_t size = columns.size();
924 for (size_t i = 0; i < size; ++i)
925 models[i]->train(*columns[i]);
926 }
927
928 void finalize()
929 {
930 for (auto & model : models)
931 model->finalize();
932 }
933
934 Columns generate(const Columns & columns)
935 {
936 size_t size = columns.size();
937 Columns res(size);
938 for (size_t i = 0; i < size; ++i)
939 res[i] = models[i]->generate(*columns[i]);
940 return res;
941 }
942};
943
944}
945
946#pragma GCC diagnostic ignored "-Wunused-function"
947#pragma GCC diagnostic ignored "-Wmissing-declarations"
948
949int mainEntryClickHouseObfuscator(int argc, char ** argv)
950try
951{
952 using namespace DB;
953 namespace po = boost::program_options;
954
955 po::options_description description = createOptionsDescription("Options", getTerminalWidth());
956 description.add_options()
957 ("help", "produce help message")
958 ("structure,S", po::value<std::string>(), "structure of the initial table (list of column and type names)")
959 ("input-format", po::value<std::string>(), "input format of the initial table data")
960 ("output-format", po::value<std::string>(), "default output format")
961 ("seed", po::value<std::string>(), "seed (arbitrary string), must be random string with at least 10 bytes length; note that a seed for each column is derived from this seed and a column name: you can obfuscate data for different tables and as long as you use identical seed and identical column names, the data for corresponding non-text columns for different tables will be transformed in the same way, so the data for different tables can be JOINed after obfuscation")
962 ("limit", po::value<UInt64>(), "if specified - stop after generating that number of rows")
963 ("silent", po::value<bool>()->default_value(false), "don't print information messages to stderr")
964 ("order", po::value<UInt64>()->default_value(5), "order of markov model to generate strings")
965 ("frequency-cutoff", po::value<UInt64>()->default_value(5), "frequency cutoff for markov model: remove all buckets with count less than specified")
966 ("num-buckets-cutoff", po::value<UInt64>()->default_value(0), "cutoff for number of different possible continuations for a context: remove all histograms with less than specified number of buckets")
967 ("frequency-add", po::value<UInt64>()->default_value(0), "add a constant to every count to lower probability distribution skew")
968 ("frequency-desaturate", po::value<double>()->default_value(0), "0..1 - move every frequency towards average to lower probability distribution skew")
969 ("determinator-sliding-window-size", po::value<UInt64>()->default_value(8), "size of a sliding window in a source string - its hash is used as a seed for RNG in markov model")
970 ;
971
972 po::parsed_options parsed = po::command_line_parser(argc, argv).options(description).run();
973 po::variables_map options;
974 po::store(parsed, options);
975
976 if (options.count("help")
977 || !options.count("seed")
978 || !options.count("structure")
979 || !options.count("input-format")
980 || !options.count("output-format"))
981 {
982 std::cout << documantation << "\n"
983 << "\nUsage: " << argv[0] << " [options] < in > out\n"
984 << "\nInput must be seekable file (it will be read twice).\n"
985 << "\n" << description << "\n"
986 << "\nExample:\n " << argv[0] << " --seed \"$(head -c16 /dev/urandom | base64)\" --input-format TSV --output-format TSV --structure 'CounterID UInt32, URLDomain String, URL String, SearchPhrase String, Title String' < stats.tsv\n";
987 return 0;
988 }
989
990 UInt64 seed = sipHash64(options["seed"].as<std::string>());
991
992 std::string structure = options["structure"].as<std::string>();
993 std::string input_format = options["input-format"].as<std::string>();
994 std::string output_format = options["output-format"].as<std::string>();
995
996 std::optional<UInt64> limit;
997 if (options.count("limit"))
998 limit = options["limit"].as<UInt64>();
999
1000 bool silent = options["silent"].as<bool>();
1001
1002 MarkovModelParameters markov_model_params;
1003
1004 markov_model_params.order = options["order"].as<UInt64>();
1005 markov_model_params.frequency_cutoff = options["frequency-cutoff"].as<UInt64>();
1006 markov_model_params.num_buckets_cutoff = options["num-buckets-cutoff"].as<UInt64>();
1007 markov_model_params.frequency_add = options["frequency-add"].as<UInt64>();
1008 markov_model_params.frequency_desaturate = options["frequency-desaturate"].as<double>();
1009 markov_model_params.determinator_sliding_window_size = options["determinator-sliding-window-size"].as<UInt64>();
1010
1011 // Create header block
1012 std::vector<std::string> structure_vals;
1013 boost::split(structure_vals, structure, boost::algorithm::is_any_of(" ,"), boost::algorithm::token_compress_on);
1014
1015 if (structure_vals.size() % 2 != 0)
1016 throw Exception("Odd number of elements in section structure: must be a list of name type pairs", ErrorCodes::LOGICAL_ERROR);
1017
1018 Block header;
1019 const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
1020
1021 for (size_t i = 0, size = structure_vals.size(); i < size; i += 2)
1022 {
1023 ColumnWithTypeAndName column;
1024 column.name = structure_vals[i];
1025 column.type = data_type_factory.get(structure_vals[i + 1]);
1026 column.column = column.type->createColumn();
1027 header.insert(std::move(column));
1028 }
1029
1030 Context context = Context::createGlobal();
1031 context.makeGlobalContext();
1032
1033 ReadBufferFromFileDescriptor file_in(STDIN_FILENO);
1034 WriteBufferFromFileDescriptor file_out(STDOUT_FILENO);
1035
1036 {
1037 /// stdin must be seekable
1038 auto res = lseek(file_in.getFD(), 0, SEEK_SET);
1039 if (-1 == res)
1040 throwFromErrno("Input must be seekable file (it will be read twice).", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
1041 }
1042
1043 Obfuscator obfuscator(header, seed, markov_model_params);
1044
1045 UInt64 max_block_size = 8192;
1046
1047 /// Train step
1048 {
1049 if (!silent)
1050 std::cerr << "Training models\n";
1051
1052 BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size);
1053
1054 UInt64 processed_rows = 0;
1055 input->readPrefix();
1056 while (Block block = input->read())
1057 {
1058 obfuscator.train(block.getColumns());
1059 processed_rows += block.rows();
1060 if (!silent)
1061 std::cerr << "Processed " << processed_rows << " rows\n";
1062 }
1063 input->readSuffix();
1064 }
1065
1066 obfuscator.finalize();
1067
1068 /// Generation step
1069 {
1070 if (!silent)
1071 std::cerr << "Generating data\n";
1072
1073 file_in.seek(0);
1074
1075 BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size);
1076 BlockOutputStreamPtr output = context.getOutputFormat(output_format, file_out, header);
1077
1078 if (limit)
1079 input = std::make_shared<LimitBlockInputStream>(input, *limit, 0);
1080
1081 UInt64 processed_rows = 0;
1082 input->readPrefix();
1083 output->writePrefix();
1084 while (Block block = input->read())
1085 {
1086 Columns columns = obfuscator.generate(block.getColumns());
1087 output->write(header.cloneWithColumns(columns));
1088 processed_rows += block.rows();
1089 if (!silent)
1090 std::cerr << "Processed " << processed_rows << " rows\n";
1091 }
1092 output->writeSuffix();
1093 input->readSuffix();
1094 }
1095
1096 return 0;
1097}
1098catch (...)
1099{
1100 std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
1101 auto code = DB::getCurrentExceptionCode();
1102 return code ? code : 1;
1103}
1104