1#include <iostream>
2#include <iomanip>
3#include <mutex>
4#include <atomic>
5
6//#define DBMS_HASH_MAP_DEBUG_RESIZES
7
8#include <Interpreters/AggregationCommon.h>
9
10#include <Common/HashTable/HashMap.h>
11#include <Common/HashTable/TwoLevelHashMap.h>
12//#include <Common/HashTable/HashTableWithSmallLocks.h>
13//#include <Common/HashTable/HashTableMerge.h>
14
15#include <IO/ReadBufferFromFile.h>
16#include <Compression/CompressedReadBuffer.h>
17
18#include <Common/Stopwatch.h>
19#include <Common/ThreadPool.h>
20
21
22using Key = UInt64;
23using Value = UInt64;
24using Source = std::vector<Key>;
25
26
27template <typename Map>
28struct AggregateIndependent
29{
30 template <typename Creator, typename Updater>
31 static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results,
32 Creator && creator, Updater && updater,
33 ThreadPool & pool)
34 {
35 results.reserve(num_threads);
36 for (size_t i = 0; i < num_threads; ++i)
37 results.emplace_back(std::make_unique<Map>());
38
39 for (size_t i = 0; i < num_threads; ++i)
40 {
41 auto begin = data.begin() + (data.size() * i) / num_threads;
42 auto end = data.begin() + (data.size() * (i + 1)) / num_threads;
43 auto & map = *results[i];
44
45 pool.scheduleOrThrowOnError([&, begin, end]()
46 {
47 for (auto it = begin; it != end; ++it)
48 {
49 typename Map::LookupResult place;
50 bool inserted;
51 map.emplace(*it, place, inserted);
52
53 if (inserted)
54 creator(place->getMapped());
55 else
56 updater(place->getMapped());
57 }
58 });
59 }
60
61 pool.wait();
62 }
63};
64
65#if !__clang__
66#pragma GCC diagnostic push
67#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
68#endif
69
70template <typename Map>
71struct AggregateIndependentWithSequentialKeysOptimization
72{
73 template <typename Creator, typename Updater>
74 static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector<std::unique_ptr<Map>> & results,
75 Creator && creator, Updater && updater,
76 ThreadPool & pool)
77 {
78 results.reserve(num_threads);
79 for (size_t i = 0; i < num_threads; ++i)
80 results.emplace_back(std::make_unique<Map>());
81
82 for (size_t i = 0; i < num_threads; ++i)
83 {
84 auto begin = data.begin() + (data.size() * i) / num_threads;
85 auto end = data.begin() + (data.size() * (i + 1)) / num_threads;
86 auto & map = *results[i];
87
88 pool.scheduleOrThrowOnError([&, begin, end]()
89 {
90 typename Map::LookupResult place = nullptr;
91 Key prev_key {};
92 for (auto it = begin; it != end; ++it)
93 {
94 if (it != begin && *it == prev_key)
95 {
96 updater(place->getMapped());
97 continue;
98 }
99 prev_key = *it;
100
101 bool inserted;
102 map.emplace(*it, place, inserted);
103
104 if (inserted)
105 creator(place->getMapped());
106 else
107 updater(place->getMapped());
108 }
109 });
110 }
111
112 pool.wait();
113 }
114};
115
116#if !__clang__
117#pragma GCC diagnostic pop
118#endif
119
120
121template <typename Map>
122struct MergeSequential
123{
124 template <typename Merger>
125 static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
126 Merger && merger,
127 ThreadPool &)
128 {
129 for (size_t i = 1; i < num_maps; ++i)
130 {
131 auto begin = source_maps[i]->begin();
132 auto end = source_maps[i]->end();
133 for (auto it = begin; it != end; ++it)
134 merger((*source_maps[0])[it->getKey()], it->getMapped());
135 }
136
137 result_map = source_maps[0];
138 }
139};
140
141template <typename Map>
142struct MergeSequentialTransposed /// In practice not better than usual.
143{
144 template <typename Merger>
145 static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
146 Merger && merger,
147 ThreadPool &)
148 {
149 std::vector<typename Map::iterator> iterators(num_maps);
150 for (size_t i = 1; i < num_maps; ++i)
151 iterators[i] = source_maps[i]->begin();
152
153 result_map = source_maps[0];
154
155 while (true)
156 {
157 bool finish = true;
158 for (size_t i = 1; i < num_maps; ++i)
159 {
160 if (iterators[i] == source_maps[i]->end())
161 continue;
162
163 finish = false;
164 merger((*result_map)[iterators[i]->getKey()], iterators[i]->getMapped());
165 ++iterators[i];
166 }
167
168 if (finish)
169 break;
170 }
171 }
172};
173
174template <typename Map, typename ImplMerge>
175struct MergeParallelForTwoLevelTable
176{
177 template <typename Merger>
178 static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map,
179 Merger && merger,
180 ThreadPool & pool)
181 {
182 for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket)
183 pool.scheduleOrThrowOnError([&, bucket, num_maps]
184 {
185 std::vector<typename Map::Impl *> section(num_maps);
186 for (size_t i = 0; i < num_maps; ++i)
187 section[i] = &source_maps[i]->impls[bucket];
188
189 typename Map::Impl * res;
190 ImplMerge::execute(section.data(), num_maps, res, merger, pool);
191 });
192
193 pool.wait();
194 result_map = source_maps[0];
195 }
196};
197
198
199template <typename Map, typename Aggregate, typename Merge>
200struct Work
201{
202 template <typename Creator, typename Updater, typename Merger>
203 static void NO_INLINE execute(const Source & data, size_t num_threads,
204 Creator && creator, Updater && updater, Merger && merger,
205 ThreadPool & pool)
206 {
207 std::vector<std::unique_ptr<Map>> intermediate_results;
208
209 Stopwatch watch;
210
211 Aggregate::execute(data, num_threads, intermediate_results, std::forward<Creator>(creator), std::forward<Updater>(updater), pool);
212 size_t num_maps = intermediate_results.size();
213
214 watch.stop();
215 double time_aggregated = watch.elapsedSeconds();
216 std::cerr
217 << "Aggregated in " << time_aggregated
218 << " (" << data.size() / time_aggregated << " elem/sec.)"
219 << std::endl;
220
221 size_t size_before_merge = 0;
222 std::cerr << "Sizes: ";
223 for (size_t i = 0; i < num_threads; ++i)
224 {
225 std::cerr << (i == 0 ? "" : ", ") << intermediate_results[i]->size();
226 size_before_merge += intermediate_results[i]->size();
227 }
228 std::cerr << std::endl;
229
230 watch.restart();
231
232 std::vector<Map*> intermediate_results_ptrs(num_maps);
233 for (size_t i = 0; i < num_maps; ++i)
234 intermediate_results_ptrs[i] = intermediate_results[i].get();
235
236 Map * result_map;
237 Merge::execute(intermediate_results_ptrs.data(), num_maps, result_map, std::forward<Merger>(merger), pool);
238
239 watch.stop();
240 double time_merged = watch.elapsedSeconds();
241 std::cerr
242 << "Merged in " << time_merged
243 << " (" << size_before_merge / time_merged << " elem/sec.)"
244 << std::endl;
245
246 double time_total = time_aggregated + time_merged;
247 std::cerr
248 << "Total in " << time_total
249 << " (" << data.size() / time_total << " elem/sec.)"
250 << std::endl;
251 std::cerr << "Size: " << result_map->size() << std::endl << std::endl;
252 }
253};
254
255
256using Map = HashMap<Key, Value, HashCRC32<Key>>;
257using MapTwoLevel = TwoLevelHashMap<Key, Value, HashCRC32<Key>>;
258using Mutex = std::mutex;
259
260
261struct Creator
262{
263 void operator()(Value &) const {}
264};
265
266#if !__clang__
267#pragma GCC diagnostic push
268#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
269#endif
270
271struct Updater
272{
273 void operator()(Value & x) const { ++x; }
274};
275
276#if !__clang__
277#pragma GCC diagnostic pop
278#endif
279
280struct Merger
281{
282 void operator()(Value & dst, const Value & src) const { dst += src; }
283};
284
285
286
287int main(int argc, char ** argv)
288{
289 size_t n = atoi(argv[1]);
290 size_t num_threads = atoi(argv[2]);
291 size_t method = argc <= 3 ? 0 : atoi(argv[3]);
292
293 std::cerr << std::fixed << std::setprecision(2);
294
295 ThreadPool pool(num_threads);
296
297 Source data(n);
298
299 {
300 Stopwatch watch;
301 DB::ReadBufferFromFileDescriptor in1(STDIN_FILENO);
302 DB::CompressedReadBuffer in2(in1);
303
304 in2.readStrict(reinterpret_cast<char*>(data.data()), sizeof(data[0]) * n);
305
306 watch.stop();
307 std::cerr << std::fixed << std::setprecision(2)
308 << "Vector. Size: " << n
309 << ", elapsed: " << watch.elapsedSeconds()
310 << " (" << n / watch.elapsedSeconds() << " elem/sec.)"
311 << std::endl << std::endl;
312 }
313
314 Creator creator;
315 Updater updater;
316 Merger merger;
317
318 if (!method || method == 1)
319 Work<
320 Map,
321 AggregateIndependent<Map>,
322 MergeSequential<Map>
323 >::execute(data, num_threads, creator, updater, merger, pool);
324
325 if (!method || method == 2)
326 Work<
327 Map,
328 AggregateIndependentWithSequentialKeysOptimization<Map>,
329 MergeSequential<Map>
330 >::execute(data, num_threads, creator, updater, merger, pool);
331
332 if (!method || method == 3)
333 Work<
334 Map,
335 AggregateIndependent<Map>,
336 MergeSequentialTransposed<Map>
337 >::execute(data, num_threads, creator, updater, merger, pool);
338
339 if (!method || method == 4)
340 Work<
341 Map,
342 AggregateIndependentWithSequentialKeysOptimization<Map>,
343 MergeSequentialTransposed<Map>
344 >::execute(data, num_threads, creator, updater, merger, pool);
345
346 if (!method || method == 5)
347 Work<
348 MapTwoLevel,
349 AggregateIndependent<MapTwoLevel>,
350 MergeSequential<MapTwoLevel>
351 >::execute(data, num_threads, creator, updater, merger, pool);
352
353 if (!method || method == 6)
354 Work<
355 MapTwoLevel,
356 AggregateIndependentWithSequentialKeysOptimization<MapTwoLevel>,
357 MergeSequential<MapTwoLevel>
358 >::execute(data, num_threads, creator, updater, merger, pool);
359
360 if (!method || method == 7)
361 Work<
362 MapTwoLevel,
363 AggregateIndependent<MapTwoLevel>,
364 MergeSequentialTransposed<MapTwoLevel>
365 >::execute(data, num_threads, creator, updater, merger, pool);
366
367 if (!method || method == 8)
368 Work<
369 MapTwoLevel,
370 AggregateIndependentWithSequentialKeysOptimization<MapTwoLevel>,
371 MergeSequentialTransposed<MapTwoLevel>
372 >::execute(data, num_threads, creator, updater, merger, pool);
373
374 if (!method || method == 9)
375 Work<
376 MapTwoLevel,
377 AggregateIndependent<MapTwoLevel>,
378 MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequential<MapTwoLevel::Impl>>
379 >::execute(data, num_threads, creator, updater, merger, pool);
380
381 if (!method || method == 10)
382 Work<
383 MapTwoLevel,
384 AggregateIndependentWithSequentialKeysOptimization<MapTwoLevel>,
385 MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequential<MapTwoLevel::Impl>>
386 >::execute(data, num_threads, creator, updater, merger, pool);
387
388 if (!method || method == 13)
389 Work<
390 MapTwoLevel,
391 AggregateIndependent<MapTwoLevel>,
392 MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequentialTransposed<MapTwoLevel::Impl>>
393 >::execute(data, num_threads, creator, updater, merger, pool);
394
395 if (!method || method == 14)
396 Work<
397 MapTwoLevel,
398 AggregateIndependentWithSequentialKeysOptimization<MapTwoLevel>,
399 MergeParallelForTwoLevelTable<MapTwoLevel, MergeSequentialTransposed<MapTwoLevel::Impl>>
400 >::execute(data, num_threads, creator, updater, merger, pool);
401
402 return 0;
403}
404