1#include "RedisDictionarySource.h"
2#include "DictionarySourceFactory.h"
3#include "DictionaryStructure.h"
4#include "registerDictionaries.h"
5
6namespace DB
7{
8 namespace ErrorCodes
9 {
10 extern const int SUPPORT_IS_DISABLED;
11 }
12
13 void registerDictionarySourceRedis(DictionarySourceFactory & factory)
14 {
15 auto createTableSource = [=](const DictionaryStructure & dict_struct,
16 const Poco::Util::AbstractConfiguration & config,
17 const String & config_prefix,
18 Block & sample_block,
19 const Context & /* context */,
20 bool /* check_config */) -> DictionarySourcePtr {
21#if USE_POCO_REDIS
22 return std::make_unique<RedisDictionarySource>(dict_struct, config, config_prefix + ".redis", sample_block);
23#else
24 UNUSED(dict_struct);
25 UNUSED(config);
26 UNUSED(config_prefix);
27 UNUSED(sample_block);
28 throw Exception{"Dictionary source of type `redis` is disabled because poco library was built without redis support.",
29 ErrorCodes::SUPPORT_IS_DISABLED};
30#endif
31 };
32 factory.registerSource("redis", createTableSource);
33 }
34
35}
36
37
38#if USE_POCO_REDIS
39
40# include <Poco/Redis/Array.h>
41# include <Poco/Redis/Client.h>
42# include <Poco/Redis/Command.h>
43# include <Poco/Redis/Type.h>
44# include <Poco/Util/AbstractConfiguration.h>
45
46# include <Common/FieldVisitors.h>
47# include <IO/WriteHelpers.h>
48
49# include "RedisBlockInputStream.h"
50
51
52namespace DB
53{
54 namespace ErrorCodes
55 {
56 extern const int UNSUPPORTED_METHOD;
57 extern const int INVALID_CONFIG_PARAMETER;
58 extern const int INTERNAL_REDIS_ERROR;
59 }
60
61
62 static const size_t max_block_size = 8192;
63
64 RedisDictionarySource::RedisDictionarySource(
65 const DictionaryStructure & dict_struct_,
66 const String & host_,
67 UInt16 port_,
68 UInt8 db_index_,
69 RedisStorageType storage_type_,
70 const Block & sample_block_)
71 : dict_struct{dict_struct_}
72 , host{host_}
73 , port{port_}
74 , db_index{db_index_}
75 , storage_type{storage_type_}
76 , sample_block{sample_block_}
77 , client{std::make_shared<Poco::Redis::Client>(host, port)}
78 {
79 if (dict_struct.attributes.size() != 1)
80 throw Exception{"Invalid number of non key columns for Redis source: " +
81 DB::toString(dict_struct.attributes.size()) + ", expected 1",
82 ErrorCodes::INVALID_CONFIG_PARAMETER};
83
84 if (storage_type == RedisStorageType::HASH_MAP)
85 {
86 if (!dict_struct.key)
87 throw Exception{"Redis source with storage type \'hash_map\' must have key",
88 ErrorCodes::INVALID_CONFIG_PARAMETER};
89
90 if (dict_struct.key->size() != 2)
91 throw Exception{"Redis source with storage type \'hash_map\' requiers 2 keys",
92 ErrorCodes::INVALID_CONFIG_PARAMETER};
93 // suppose key[0] is primary key, key[1] is secondary key
94 }
95
96 if (db_index != 0)
97 {
98 RedisCommand command("SELECT");
99 command << static_cast<Int64>(db_index);
100 String reply = client->execute<String>(command);
101 if (reply != "+OK\r\n")
102 throw Exception{"Selecting database with index " + DB::toString(db_index)
103 + " failed with reason " + reply, ErrorCodes::INTERNAL_REDIS_ERROR};
104 }
105 }
106
107
108 RedisDictionarySource::RedisDictionarySource(
109 const DictionaryStructure & dict_struct_,
110 const Poco::Util::AbstractConfiguration & config_,
111 const String & config_prefix_,
112 Block & sample_block_)
113 : RedisDictionarySource(
114 dict_struct_,
115 config_.getString(config_prefix_ + ".host"),
116 config_.getUInt(config_prefix_ + ".port"),
117 config_.getUInt(config_prefix_ + ".db_index", 0),
118 parseStorageType(config_.getString(config_prefix_ + ".storage_type", "")),
119 sample_block_)
120 {
121 }
122
123
124 RedisDictionarySource::RedisDictionarySource(const RedisDictionarySource & other)
125 : RedisDictionarySource{other.dict_struct,
126 other.host,
127 other.port,
128 other.db_index,
129 other.storage_type,
130 other.sample_block}
131 {
132 }
133
134
135 RedisDictionarySource::~RedisDictionarySource() = default;
136
137 static String storageTypeToKeyType(RedisStorageType type)
138 {
139 switch (type)
140 {
141 case RedisStorageType::SIMPLE:
142 return "string";
143 case RedisStorageType::HASH_MAP:
144 return "hash";
145 default:
146 return "none";
147 }
148
149 __builtin_unreachable();
150 }
151
152 BlockInputStreamPtr RedisDictionarySource::loadAll()
153 {
154 RedisCommand command_for_keys("KEYS");
155 command_for_keys << "*";
156
157 /// Get only keys for specified storage type.
158 auto all_keys = client->execute<RedisArray>(command_for_keys);
159 if (all_keys.isNull())
160 return std::make_shared<RedisBlockInputStream>(client, RedisArray{}, storage_type, sample_block, max_block_size);
161
162 RedisArray keys;
163 auto key_type = storageTypeToKeyType(storage_type);
164 for (auto & key : all_keys)
165 if (key_type == client->execute<String>(RedisCommand("TYPE").addRedisType(key)))
166 keys.addRedisType(std::move(key));
167
168 if (storage_type == RedisStorageType::HASH_MAP)
169 {
170 RedisArray hkeys;
171 for (const auto & key : keys)
172 {
173 RedisCommand command_for_secondary_keys("HKEYS");
174 command_for_secondary_keys.addRedisType(key);
175
176 auto secondary_keys = client->execute<RedisArray>(command_for_secondary_keys);
177
178 RedisArray primary_with_secondary;
179 primary_with_secondary.addRedisType(key);
180 for (const auto & secondary_key : secondary_keys)
181 {
182 primary_with_secondary.addRedisType(secondary_key);
183 /// Do not store more than max_block_size values for one request.
184 if (primary_with_secondary.size() == max_block_size + 1)
185 {
186 hkeys.add(std::move(primary_with_secondary));
187 primary_with_secondary.clear();
188 primary_with_secondary.addRedisType(key);
189 }
190 }
191 if (primary_with_secondary.size() > 1)
192 hkeys.add(std::move(primary_with_secondary));
193 }
194
195 keys = std::move(hkeys);
196 }
197
198 return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
199 }
200
201
202 BlockInputStreamPtr RedisDictionarySource::loadIds(const std::vector<UInt64> & ids)
203 {
204 if (storage_type != RedisStorageType::SIMPLE)
205 throw Exception{"Cannot use loadIds with \'simple\' storage type", ErrorCodes::UNSUPPORTED_METHOD};
206
207 if (!dict_struct.id)
208 throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD};
209
210 RedisArray keys;
211
212 for (UInt64 id : ids)
213 keys << DB::toString(id);
214
215 return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
216 }
217
218 String RedisDictionarySource::toString() const
219 {
220 return "Redis: " + host + ':' + DB::toString(port);
221 }
222
223 RedisStorageType RedisDictionarySource::parseStorageType(const String & storage_type_str)
224 {
225 if (storage_type_str == "hash_map")
226 return RedisStorageType::HASH_MAP;
227 else if (!storage_type_str.empty() && storage_type_str != "simple")
228 throw Exception("Unknown storage type " + storage_type_str + " for Redis dictionary", ErrorCodes::INVALID_CONFIG_PARAMETER);
229
230 return RedisStorageType::SIMPLE;
231 }
232}
233
234#endif
235