1#include <DataTypes/DataTypeString.h>
2#include <Functions/FunctionFactory.h>
3#include <Functions/FunctionStringToString.h>
4#include <IO/WriteBufferFromVector.h>
5#include <IO/WriteBufferValidUTF8.h>
6#include <IO/WriteHelpers.h>
7#include <Poco/UTF8Encoding.h>
8
9#include <string_view>
10
11#ifdef __SSE2__
12# include <emmintrin.h>
13#endif
14
15namespace DB
16{
17namespace ErrorCodes
18{
19 extern const int ILLEGAL_COLUMN;
20}
21
22extern const UInt8 length_of_utf8_sequence[256];
23
24struct ToValidUTF8Impl
25{
26 static void toValidUTF8One(const char * begin, const char * end, WriteBuffer & write_buffer)
27 {
28 static constexpr std::string_view replacement = "\xEF\xBF\xBD";
29
30 const char * p = begin;
31 const char * valid_start = begin;
32
33 /// The last recorded character was `replacement`.
34 bool just_put_replacement = false;
35
36 auto put_valid = [&write_buffer, &just_put_replacement](const char * data, size_t len)
37 {
38 if (len == 0)
39 return;
40 just_put_replacement = false;
41 write_buffer.write(data, len);
42 };
43
44 auto put_replacement = [&write_buffer, &just_put_replacement]()
45 {
46 if (just_put_replacement)
47 return;
48 just_put_replacement = true;
49 write_buffer.write(replacement.data(), replacement.size());
50 };
51
52 while (p < end)
53 {
54#ifdef __SSE2__
55 /// Fast skip of ASCII
56 static constexpr size_t SIMD_BYTES = 16;
57 const char * simd_end = p + (end - p) / SIMD_BYTES * SIMD_BYTES;
58
59 while (p < simd_end && !_mm_movemask_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(p))))
60 p += SIMD_BYTES;
61
62 if (!(p < end))
63 break;
64#endif
65
66 size_t len = length_of_utf8_sequence[static_cast<unsigned char>(*p)];
67
68 if (len > 4)
69 {
70 /// Invalid start of sequence. Skip one byte.
71 put_valid(valid_start, p - valid_start);
72 put_replacement();
73 ++p;
74 valid_start = p;
75 }
76 else if (p + len > end)
77 {
78 /// Sequence was not fully written to this buffer.
79 break;
80 }
81 else if (Poco::UTF8Encoding::isLegal(reinterpret_cast<const unsigned char *>(p), len))
82 {
83 /// Valid sequence.
84 p += len;
85 }
86 else
87 {
88 /// Invalid sequence. Skip just first byte.
89 put_valid(valid_start, p - valid_start);
90 put_replacement();
91 ++p;
92 valid_start = p;
93 }
94 }
95
96 put_valid(valid_start, p - valid_start);
97
98 if (p != end)
99 put_replacement();
100 }
101
102 static void vector(
103 const ColumnString::Chars & data,
104 const ColumnString::Offsets & offsets,
105 ColumnString::Chars & res_data,
106 ColumnString::Offsets & res_offsets)
107 {
108 const size_t offsets_size = offsets.size();
109 /// It can be larger than that, but we believe it is unlikely to happen.
110 res_data.resize(data.size());
111 res_offsets.resize(offsets_size);
112
113 size_t prev_offset = 0;
114 WriteBufferFromVector<ColumnString::Chars> write_buffer(res_data);
115 for (size_t i = 0; i < offsets_size; ++i)
116 {
117 const char * haystack_data = reinterpret_cast<const char *>(&data[prev_offset]);
118 const size_t haystack_size = offsets[i] - prev_offset - 1;
119 toValidUTF8One(haystack_data, haystack_data + haystack_size, write_buffer);
120 writeChar(0, write_buffer);
121 res_offsets[i] = write_buffer.count();
122 prev_offset = offsets[i];
123 }
124 write_buffer.finish();
125 }
126
127 [[noreturn]] static void vector_fixed(const ColumnString::Chars &, size_t, ColumnString::Chars &)
128 {
129 throw Exception("Column of type FixedString is not supported by toValidUTF8 function", ErrorCodes::ILLEGAL_COLUMN);
130 }
131};
132
133struct NameToValidUTF8
134{
135 static constexpr auto name = "toValidUTF8";
136};
137using FunctionToValidUTF8 = FunctionStringToString<ToValidUTF8Impl, NameToValidUTF8>;
138
139void registerFunctionToValidUTF8(FunctionFactory & factory)
140{
141 factory.registerFunction<FunctionToValidUTF8>();
142}
143
144}
145