1 | #include <iostream> |
2 | #include <iomanip> |
3 | #include <mutex> |
4 | #include <atomic> |
5 | |
6 | //#define DBMS_HASH_MAP_DEBUG_RESIZES |
7 | |
8 | #include <Interpreters/AggregationCommon.h> |
9 | |
10 | #include <Common/HashTable/HashMap.h> |
11 | #include <Common/HashTable/TwoLevelHashMap.h> |
12 | //#include <Common/HashTable/HashTableWithSmallLocks.h> |
13 | //#include <Common/HashTable/HashTableMerge.h> |
14 | |
15 | #include <IO/ReadBufferFromFile.h> |
16 | #include <Compression/CompressedReadBuffer.h> |
17 | |
18 | #include <Common/Stopwatch.h> |
19 | #include <Common/ThreadPool.h> |
20 | |
21 | |
22 | using Key = UInt64; |
23 | using Value = UInt64; |
24 | using Source = std::vector<Key>; |
25 | |
26 | |
27 | template <typename Map> |
28 | struct AggregateIndependent |
29 | { |
30 | template <typename Creator, typename Updater> |
31 | static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results, |
32 | Creator && creator, Updater && updater, |
33 | ThreadPool & pool) |
34 | { |
35 | results.reserve(num_threads); |
36 | for (size_t i = 0; i < num_threads; ++i) |
37 | results.emplace_back(std::make_unique<Map>()); |
38 | |
39 | for (size_t i = 0; i < num_threads; ++i) |
40 | { |
41 | auto begin = data.begin() + (data.size() * i) / num_threads; |
42 | auto end = data.begin() + (data.size() * (i + 1)) / num_threads; |
43 | auto & map = *results[i]; |
44 | |
45 | pool.scheduleOrThrowOnError([&, begin, end]() |
46 | { |
47 | for (auto it = begin; it != end; ++it) |
48 | { |
49 | typename Map::LookupResult place; |
50 | bool inserted; |
51 | map.emplace(*it, place, inserted); |
52 | |
53 | if (inserted) |
54 | creator(place->getMapped()); |
55 | else |
56 | updater(place->getMapped()); |
57 | } |
58 | }); |
59 | } |
60 | |
61 | pool.wait(); |
62 | } |
63 | }; |
64 | |
65 | #if !__clang__ |
66 | #pragma GCC diagnostic push |
67 | #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" |
68 | #endif |
69 | |
70 | template <typename Map> |
71 | struct AggregateIndependentWithSequentialKeysOptimization |
72 | { |
73 | template <typename Creator, typename Updater> |
74 | static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results, |
75 | Creator && creator, Updater && updater, |
76 | ThreadPool & pool) |
77 | { |
78 | results.reserve(num_threads); |
79 | for (size_t i = 0; i < num_threads; ++i) |
80 | results.emplace_back(std::make_unique<Map>()); |
81 | |
82 | for (size_t i = 0; i < num_threads; ++i) |
83 | { |
84 | auto begin = data.begin() + (data.size() * i) / num_threads; |
85 | auto end = data.begin() + (data.size() * (i + 1)) / num_threads; |
86 | auto & map = *results[i]; |
87 | |
88 | pool.scheduleOrThrowOnError([&, begin, end]() |
89 | { |
90 | typename Map::LookupResult place = nullptr; |
91 | Key prev_key {}; |
92 | for (auto it = begin; it != end; ++it) |
93 | { |
94 | if (it != begin && *it == prev_key) |
95 | { |
96 | updater(place->getMapped()); |
97 | continue; |
98 | } |
99 | prev_key = *it; |
100 | |
101 | bool inserted; |
102 | map.emplace(*it, place, inserted); |
103 | |
104 | if (inserted) |
105 | creator(place->getMapped()); |
106 | else |
107 | updater(place->getMapped()); |
108 | } |
109 | }); |
110 | } |
111 | |
112 | pool.wait(); |
113 | } |
114 | }; |
115 | |
116 | #if !__clang__ |
117 | #pragma GCC diagnostic pop |
118 | #endif |
119 | |
120 | |
121 | template <typename Map> |
122 | struct MergeSequential |
123 | { |
124 | template <typename Merger> |
125 | static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map, |
126 | Merger && merger, |
127 | ThreadPool &) |
128 | { |
129 | for (size_t i = 1; i < num_maps; ++i) |
130 | { |
131 | auto begin = source_maps[i]->begin(); |
132 | auto end = source_maps[i]->end(); |
133 | for (auto it = begin; it != end; ++it) |
134 | merger((*source_maps[0])[it->getKey()], it->getMapped()); |
135 | } |
136 | |
137 | result_map = source_maps[0]; |
138 | } |
139 | }; |
140 | |
141 | template <typename Map> |
142 | struct MergeSequentialTransposed /// In practice not better than usual. |
143 | { |
144 | template <typename Merger> |
145 | static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map, |
146 | Merger && merger, |
147 | ThreadPool &) |
148 | { |
149 | std::vector<typename Map::iterator> iterators(num_maps); |
150 | for (size_t i = 1; i < num_maps; ++i) |
151 | iterators[i] = source_maps[i]->begin(); |
152 | |
153 | result_map = source_maps[0]; |
154 | |
155 | while (true) |
156 | { |
157 | bool finish = true; |
158 | for (size_t i = 1; i < num_maps; ++i) |
159 | { |
160 | if (iterators[i] == source_maps[i]->end()) |
161 | continue; |
162 | |
163 | finish = false; |
164 | merger((*result_map)[iterators[i]->getKey()], iterators[i]->getMapped()); |
165 | ++iterators[i]; |
166 | } |
167 | |
168 | if (finish) |
169 | break; |
170 | } |
171 | } |
172 | }; |
173 | |
174 | template <typename Map, typename ImplMerge> |
175 | struct MergeParallelForTwoLevelTable |
176 | { |
177 | template <typename Merger> |
178 | static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map, |
179 | Merger && merger, |
180 | ThreadPool & pool) |
181 | { |
182 | for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket) |
183 | pool.scheduleOrThrowOnError([&, bucket, num_maps] |
184 | { |
185 | std::vector<typename Map::Impl *> section(num_maps); |
186 | for (size_t i = 0; i < num_maps; ++i) |
187 | section[i] = &source_maps[i]->impls[bucket]; |
188 | |
189 | typename Map::Impl * res; |
190 | ImplMerge::execute(section.data(), num_maps, res, merger, pool); |
191 | }); |
192 | |
193 | pool.wait(); |
194 | result_map = source_maps[0]; |
195 | } |
196 | }; |
197 | |
198 | |
199 | template <typename Map, typename Aggregate, typename Merge> |
200 | struct Work |
201 | { |
202 | template <typename Creator, typename Updater, typename Merger> |
203 | static void NO_INLINE execute(const Source & data, size_t num_threads, |
204 | Creator && creator, Updater && updater, Merger && merger, |
205 | ThreadPool & pool) |
206 | { |
207 | std::vector<std::unique_ptr<Map>> intermediate_results; |
208 | |
209 | Stopwatch watch; |
210 | |
211 | Aggregate::execute(data, num_threads, intermediate_results, std::forward<Creator>(creator), std::forward<Updater>(updater), pool); |
212 | size_t num_maps = intermediate_results.size(); |
213 | |
214 | watch.stop(); |
215 | double time_aggregated = watch.elapsedSeconds(); |
216 | std::cerr |
217 | << "Aggregated in " << time_aggregated |
218 | << " (" << data.size() / time_aggregated << " elem/sec.)" |
219 | << std::endl; |
220 | |
221 | size_t size_before_merge = 0; |
222 | std::cerr << "Sizes: " ; |
223 | for (size_t i = 0; i < num_threads; ++i) |
224 | { |
225 | std::cerr << (i == 0 ? "" : ", " ) << intermediate_results[i]->size(); |
226 | size_before_merge += intermediate_results[i]->size(); |
227 | } |
228 | std::cerr << std::endl; |
229 | |
230 | watch.restart(); |
231 | |
232 | std::vector<Map*> intermediate_results_ptrs(num_maps); |
233 | for (size_t i = 0; i < num_maps; ++i) |
234 | intermediate_results_ptrs[i] = intermediate_results[i].get(); |
235 | |
236 | Map * result_map; |
237 | Merge::execute(intermediate_results_ptrs.data(), num_maps, result_map, std::forward<Merger>(merger), pool); |
238 | |
239 | watch.stop(); |
240 | double time_merged = watch.elapsedSeconds(); |
241 | std::cerr |
242 | << "Merged in " << time_merged |
243 | << " (" << size_before_merge / time_merged << " elem/sec.)" |
244 | << std::endl; |
245 | |
246 | double time_total = time_aggregated + time_merged; |
247 | std::cerr |
248 | << "Total in " << time_total |
249 | << " (" << data.size() / time_total << " elem/sec.)" |
250 | << std::endl; |
251 | std::cerr << "Size: " << result_map->size() << std::endl << std::endl; |
252 | } |
253 | }; |
254 | |
255 | |
256 | using Map = HashMap<Key, Value, HashCRC32<Key>>; |
257 | using MapTwoLevel = TwoLevelHashMap<Key, Value, HashCRC32<Key>>; |
258 | using Mutex = std::mutex; |
259 | |
260 | |
261 | struct Creator |
262 | { |
263 | void operator()(Value &) const {} |
264 | }; |
265 | |
266 | #if !__clang__ |
267 | #pragma GCC diagnostic push |
268 | #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" |
269 | #endif |
270 | |
271 | struct Updater |
272 | { |
273 | void operator()(Value & x) const { ++x; } |
274 | }; |
275 | |
276 | #if !__clang__ |
277 | #pragma GCC diagnostic pop |
278 | #endif |
279 | |
280 | struct Merger |
281 | { |
282 | void operator()(Value & dst, const Value & src) const { dst += src; } |
283 | }; |
284 | |
285 | |
286 | |
287 | int main(int argc, char ** argv) |
288 | { |
289 | size_t n = atoi(argv[1]); |
290 | size_t num_threads = atoi(argv[2]); |
291 | size_t method = argc <= 3 ? 0 : atoi(argv[3]); |
292 | |
293 | std::cerr << std::fixed << std::setprecision(2); |
294 | |
295 | ThreadPool pool(num_threads); |
296 | |
297 | Source data(n); |
298 | |
299 | { |
300 | Stopwatch watch; |
301 | DB::ReadBufferFromFileDescriptor in1(STDIN_FILENO); |
302 | DB::CompressedReadBuffer in2(in1); |
303 | |
304 | in2.readStrict(reinterpret_cast<char*>(data.data()), sizeof(data[0]) * n); |
305 | |
306 | watch.stop(); |
307 | std::cerr << std::fixed << std::setprecision(2) |
308 | << "Vector. Size: " << n |
309 | << ", elapsed: " << watch.elapsedSeconds() |
310 | << " (" << n / watch.elapsedSeconds() << " elem/sec.)" |
311 | << std::endl << std::endl; |
312 | } |
313 | |
314 | Creator creator; |
315 | Updater updater; |
316 | Merger merger; |
317 | |
318 | if (!method || method == 1) |
319 | Work< |
320 | Map, |
321 | AggregateIndependent<Map>, |
322 | MergeSequential<Map> |
323 | >::execute(data, num_threads, creator, updater, merger, pool); |
324 | |
325 | if (!method || method == 2) |
326 | Work< |
327 | Map, |
328 | AggregateIndependentWithSequentialKeysOptimization<Map>, |
329 | MergeSequential<Map> |
330 | >::execute(data, num_threads, creator, updater, merger, pool); |
331 | |
332 | if (!method || method == 3) |
333 | Work< |
334 | Map, |
335 | AggregateIndependent<Map>, |
336 | MergeSequentialTransposed<Map> |
337 | >::execute(data, num_threads, creator, updater, merger, pool); |
338 | |
339 | if (!method || method == 4) |
340 | Work< |
341 | Map, |
342 | AggregateIndependentWithSequentialKeysOptimization<Map>, |
343 | MergeSequentialTransposed<Map> |
344 | >::execute(data, num_threads, creator, updater, merger, pool); |
345 | |
346 | if (!method || method == 5) |
347 | Work< |
348 | MapTwoLevel, |
349 | AggregateIndependent<MapTwoLevel>, |
350 | MergeSequential<MapTwoLevel> |
351 | >::execute(data, num_threads, creator, updater, merger, pool); |
352 | |
353 | if (!method || method == 6) |
354 | Work< |
355 | MapTwoLevel, |
356 | AggregateIndependentWithSequentialKeysOptimization<MapTwoLevel>, |
357 | MergeSequential<MapTwoLevel> |
358 | >::execute(data, num_threads, creator, updater, merger, pool); |
359 | |
360 | if (!method || method == 7) |
361 | Work< |
362 | MapTwoLevel, |
363 | AggregateIndependent<MapTwoLevel>, |
364 | MergeSequentialTransposed<MapTwoLevel> |
365 | >::execute(data, num_threads, creator, updater, merger, pool); |
366 | |
367 | if (!method || method == 8) |
368 | Work< |
369 | MapTwoLevel, |
370 | AggregateIndependentWithSequentialKeysOptimization<MapTwoLevel>, |
371 | MergeSequentialTransposed<MapTwoLevel> |
372 | >::execute(data, num_threads, creator, updater, merger, pool); |
373 | |
374 | if (!method || method == 9) |
375 | Work< |
376 | MapTwoLevel, |
377 | AggregateIndependent<MapTwoLevel>, |
378 | MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequential<MapTwoLevel::Impl>> |
379 | >::execute(data, num_threads, creator, updater, merger, pool); |
380 | |
381 | if (!method || method == 10) |
382 | Work< |
383 | MapTwoLevel, |
384 | AggregateIndependentWithSequentialKeysOptimization<MapTwoLevel>, |
385 | MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequential<MapTwoLevel::Impl>> |
386 | >::execute(data, num_threads, creator, updater, merger, pool); |
387 | |
388 | if (!method || method == 13) |
389 | Work< |
390 | MapTwoLevel, |
391 | AggregateIndependent<MapTwoLevel>, |
392 | MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequentialTransposed<MapTwoLevel::Impl>> |
393 | >::execute(data, num_threads, creator, updater, merger, pool); |
394 | |
395 | if (!method || method == 14) |
396 | Work< |
397 | MapTwoLevel, |
398 | AggregateIndependentWithSequentialKeysOptimization<MapTwoLevel>, |
399 | MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequentialTransposed<MapTwoLevel::Impl>> |
400 | >::execute(data, num_threads, creator, updater, merger, pool); |
401 | |
402 | return 0; |
403 | } |
404 | |