1#include <iostream>
2#include <iomanip>
3#include <mutex>
4#include <atomic>
5
6//#define DBMS_HASH_MAP_DEBUG_RESIZES
7
8#include <Interpreters/AggregationCommon.h>
9
10#include <Common/HashTable/HashMap.h>
11#include <Common/HashTable/TwoLevelHashMap.h>
12//#include <Common/HashTable/HashTableWithSmallLocks.h>
13//#include <Common/HashTable/HashTableMerge.h>
14
15#include <IO/ReadBufferFromFile.h>
16#include <Compression/CompressedReadBuffer.h>
17
18#include <Common/Stopwatch.h>
19#include <Common/ThreadPool.h>
20
21
22using Key = UInt64;
23using Value = UInt64;
24
25using Source = std::vector<Key>;
26
27using Map = HashMap<Key, Value>;
28using MapTwoLevel = TwoLevelHashMap<Key, Value>;
29
30
31struct SmallLock
32{
33 std::atomic<int> locked {false};
34
35 bool try_lock()
36 {
37 int expected = 0;
38 return locked.compare_exchange_strong(expected, 1, std::memory_order_acquire);
39 }
40
41 void unlock()
42 {
43 locked.store(0, std::memory_order_release);
44 }
45};
46
47struct __attribute__((__aligned__(64))) AlignedSmallLock : public SmallLock
48{
49 char dummy[64 - sizeof(SmallLock)];
50};
51
52
53using Mutex = std::mutex;
54
55
56/*using MapSmallLocks = HashTableWithSmallLocks<
57 Key,
58 HashTableCellWithLock<
59 Key,
60 HashMapCell<Key, Value, DefaultHash<Key>> >,
61 DefaultHash<Key>,
62 HashTableGrower<21>,
63 HashTableAllocator>;*/
64
65
66static void aggregate1(Map & map, Source::const_iterator begin, Source::const_iterator end)
67{
68 for (auto it = begin; it != end; ++it)
69 ++map[*it];
70}
71
72#if !__clang__
73#pragma GCC diagnostic push
74#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
75#endif
76
77static void aggregate12(Map & map, Source::const_iterator begin, Source::const_iterator end)
78{
79 Map::LookupResult found = nullptr;
80 auto prev_it = end;
81 for (auto it = begin; it != end; ++it)
82 {
83 if (prev_it != end && *it == *prev_it)
84 {
85 ++found->getMapped();
86 continue;
87 }
88 prev_it = it;
89
90 bool inserted;
91 map.emplace(*it, found, inserted);
92 ++found->getMapped();
93 }
94}
95
96static void aggregate2(MapTwoLevel & map, Source::const_iterator begin, Source::const_iterator end)
97{
98 for (auto it = begin; it != end; ++it)
99 ++map[*it];
100}
101
102static void aggregate22(MapTwoLevel & map, Source::const_iterator begin, Source::const_iterator end)
103{
104 MapTwoLevel::LookupResult found = nullptr;
105 auto prev_it = end;
106 for (auto it = begin; it != end; ++it)
107 {
108 if (*it == *prev_it)
109 {
110 ++found->getMapped();
111 continue;
112 }
113 prev_it = it;
114
115 bool inserted;
116 map.emplace(*it, found, inserted);
117 ++found->getMapped();
118 }
119}
120
121#if !__clang__
122#pragma GCC diagnostic pop
123#endif
124
125static void merge2(MapTwoLevel * maps, size_t num_threads, size_t bucket)
126{
127 for (size_t i = 1; i < num_threads; ++i)
128 for (auto it = maps[i].impls[bucket].begin(); it != maps[i].impls[bucket].end(); ++it)
129 maps[0].impls[bucket][it->getKey()] += it->getMapped();
130}
131
132static void aggregate3(Map & local_map, Map & global_map, Mutex & mutex, Source::const_iterator begin, Source::const_iterator end)
133{
134 static constexpr size_t threshold = 65536;
135
136 for (auto it = begin; it != end; ++it)
137 {
138 auto found = local_map.find(*it);
139
140 if (found)
141 ++found->getMapped();
142 else if (local_map.size() < threshold)
143 ++local_map[*it]; /// TODO You could do one lookup, not two.
144 else
145 {
146 if (mutex.try_lock())
147 {
148 ++global_map[*it];
149 mutex.unlock();
150 }
151 else
152 ++local_map[*it];
153 }
154 }
155}
156
157static void aggregate33(Map & local_map, Map & global_map, Mutex & mutex, Source::const_iterator begin, Source::const_iterator end)
158{
159 static constexpr size_t threshold = 65536;
160
161 for (auto it = begin; it != end; ++it)
162 {
163 Map::LookupResult found;
164 bool inserted;
165 local_map.emplace(*it, found, inserted);
166 ++found->getMapped();
167
168 if (inserted && local_map.size() == threshold)
169 {
170 std::lock_guard<Mutex> lock(mutex);
171 for (auto & value_type : local_map)
172 global_map[value_type.getKey()] += value_type.getMapped();
173
174 local_map.clear();
175 }
176 }
177}
178
179static void aggregate4(Map & local_map, MapTwoLevel & global_map, Mutex * mutexes, Source::const_iterator begin, Source::const_iterator end)
180{
181 static constexpr size_t threshold = 65536;
182 static constexpr size_t block_size = 8192;
183
184 auto it = begin;
185 while (it != end)
186 {
187 auto block_end = std::min(end, it + block_size);
188
189 if (local_map.size() < threshold)
190 {
191 for (; it != block_end; ++it)
192 ++local_map[*it];
193 }
194 else
195 {
196 for (; it != block_end; ++it)
197 {
198 auto found = local_map.find(*it);
199
200 if (found)
201 ++found->getMapped();
202 else
203 {
204 size_t hash_value = global_map.hash(*it);
205 size_t bucket = global_map.getBucketFromHash(hash_value);
206
207 if (mutexes[bucket].try_lock())
208 {
209 ++global_map.impls[bucket][*it];
210 mutexes[bucket].unlock();
211 }
212 else
213 ++local_map[*it];
214 }
215 }
216 }
217 }
218}
219/*
220void aggregate5(Map & local_map, MapSmallLocks & global_map, Source::const_iterator begin, Source::const_iterator end)
221{
222 static constexpr size_t threshold = 65536;
223
224 for (auto it = begin; it != end; ++it)
225 {
226 Map::iterator found = local_map.find(*it);
227
228 if (found != local_map.end())
229 ++found->second;
230 else if (local_map.size() < threshold)
231 ++local_map[*it]; /// TODO You could do one lookup, not two.
232 else
233 {
234 SmallScopedLock lock;
235 MapSmallLocks::iterator insert_it;
236 bool inserted;
237
238 if (global_map.tryEmplace(*it, insert_it, inserted, lock))
239 ++insert_it->second;
240 else
241 ++local_map[*it];
242 }
243 }
244}*/
245
246
247
248int main(int argc, char ** argv)
249{
250 size_t n = atoi(argv[1]);
251 size_t num_threads = atoi(argv[2]);
252 size_t method = argc <= 3 ? 0 : atoi(argv[3]);
253
254 std::cerr << std::fixed << std::setprecision(2);
255
256 ThreadPool pool(num_threads);
257
258 Source data(n);
259
260 {
261 Stopwatch watch;
262 DB::ReadBufferFromFileDescriptor in1(STDIN_FILENO);
263 DB::CompressedReadBuffer in2(in1);
264
265 in2.readStrict(reinterpret_cast<char*>(data.data()), sizeof(data[0]) * n);
266
267 watch.stop();
268 std::cerr << std::fixed << std::setprecision(2)
269 << "Vector. Size: " << n
270 << ", elapsed: " << watch.elapsedSeconds()
271 << " (" << n / watch.elapsedSeconds() << " elem/sec.)"
272 << std::endl << std::endl;
273 }
274
275 if (!method || method == 1)
276 {
277 /** Option 1.
278 * In different threads, we aggregate independently into different hash tables.
279 * Then merge them together.
280 */
281
282 std::vector<Map> maps(num_threads);
283
284 Stopwatch watch;
285
286 for (size_t i = 0; i < num_threads; ++i)
287 pool.scheduleOrThrowOnError(std::bind(aggregate1,
288 std::ref(maps[i]),
289 data.begin() + (data.size() * i) / num_threads,
290 data.begin() + (data.size() * (i + 1)) / num_threads));
291
292 pool.wait();
293
294 watch.stop();
295 double time_aggregated = watch.elapsedSeconds();
296 std::cerr
297 << "Aggregated in " << time_aggregated
298 << " (" << n / time_aggregated << " elem/sec.)"
299 << std::endl;
300
301 size_t size_before_merge = 0;
302 std::cerr << "Sizes: ";
303 for (size_t i = 0; i < num_threads; ++i)
304 {
305 std::cerr << (i == 0 ? "" : ", ") << maps[i].size();
306 size_before_merge += maps[i].size();
307 }
308 std::cerr << std::endl;
309
310 watch.restart();
311
312 for (size_t i = 1; i < num_threads; ++i)
313 for (auto it = maps[i].begin(); it != maps[i].end(); ++it)
314 maps[0][it->getKey()] += it->getMapped();
315
316 watch.stop();
317 double time_merged = watch.elapsedSeconds();
318 std::cerr
319 << "Merged in " << time_merged
320 << " (" << size_before_merge / time_merged << " elem/sec.)"
321 << std::endl;
322
323 double time_total = time_aggregated + time_merged;
324 std::cerr
325 << "Total in " << time_total
326 << " (" << n / time_total << " elem/sec.)"
327 << std::endl;
328 std::cerr << "Size: " << maps[0].size() << std::endl << std::endl;
329 }
330
331 if (!method || method == 12)
332 {
333 /** The same, but with optimization for consecutive identical values.
334 */
335
336 std::vector<Map> maps(num_threads);
337
338 Stopwatch watch;
339
340 for (size_t i = 0; i < num_threads; ++i)
341 pool.scheduleOrThrowOnError(std::bind(aggregate12,
342 std::ref(maps[i]),
343 data.begin() + (data.size() * i) / num_threads,
344 data.begin() + (data.size() * (i + 1)) / num_threads));
345
346 pool.wait();
347
348 watch.stop();
349 double time_aggregated = watch.elapsedSeconds();
350 std::cerr
351 << "Aggregated in " << time_aggregated
352 << " (" << n / time_aggregated << " elem/sec.)"
353 << std::endl;
354
355 size_t size_before_merge = 0;
356 std::cerr << "Sizes: ";
357 for (size_t i = 0; i < num_threads; ++i)
358 {
359 std::cerr << (i == 0 ? "" : ", ") << maps[i].size();
360 size_before_merge += maps[i].size();
361 }
362 std::cerr << std::endl;
363
364 watch.restart();
365
366 for (size_t i = 1; i < num_threads; ++i)
367 for (auto it = maps[i].begin(); it != maps[i].end(); ++it)
368 maps[0][it->getKey()] += it->getMapped();
369
370 watch.stop();
371
372 double time_merged = watch.elapsedSeconds();
373 std::cerr
374 << "Merged in " << time_merged
375 << " (" << size_before_merge / time_merged << " elem/sec.)"
376 << std::endl;
377
378 double time_total = time_aggregated + time_merged;
379 std::cerr
380 << "Total in " << time_total
381 << " (" << n / time_total << " elem/sec.)"
382 << std::endl;
383 std::cerr << "Size: " << maps[0].size() << std::endl << std::endl;
384 }
385
386 if (!method || method == 11)
387 {
388 /** Option 11.
389 * Same as option 1, but with merge, the order of the cycles is changed,
390 * which potentially can give better cache locality.
391 *
392 * In practice, there is no difference.
393 */
394
395 std::vector<Map> maps(num_threads);
396
397 Stopwatch watch;
398
399 for (size_t i = 0; i < num_threads; ++i)
400 pool.scheduleOrThrowOnError(std::bind(aggregate1,
401 std::ref(maps[i]),
402 data.begin() + (data.size() * i) / num_threads,
403 data.begin() + (data.size() * (i + 1)) / num_threads));
404
405 pool.wait();
406
407 watch.stop();
408 double time_aggregated = watch.elapsedSeconds();
409 std::cerr
410 << "Aggregated in " << time_aggregated
411 << " (" << n / time_aggregated << " elem/sec.)"
412 << std::endl;
413
414 size_t size_before_merge = 0;
415 std::cerr << "Sizes: ";
416 for (size_t i = 0; i < num_threads; ++i)
417 {
418 std::cerr << (i == 0 ? "" : ", ") << maps[i].size();
419 size_before_merge += maps[i].size();
420 }
421 std::cerr << std::endl;
422
423 watch.restart();
424
425 std::vector<Map::iterator> iterators(num_threads);
426 for (size_t i = 1; i < num_threads; ++i)
427 iterators[i] = maps[i].begin();
428
429 while (true)
430 {
431 bool finish = true;
432 for (size_t i = 1; i < num_threads; ++i)
433 {
434 if (iterators[i] == maps[i].end())
435 continue;
436
437 finish = false;
438 maps[0][iterators[i]->getKey()] += iterators[i]->getMapped();
439 ++iterators[i];
440 }
441
442 if (finish)
443 break;
444 }
445
446 watch.stop();
447 double time_merged = watch.elapsedSeconds();
448 std::cerr
449 << "Merged in " << time_merged
450 << " (" << size_before_merge / time_merged << " elem/sec.)"
451 << std::endl;
452
453 double time_total = time_aggregated + time_merged;
454 std::cerr
455 << "Total in " << time_total
456 << " (" << n / time_total << " elem/sec.)"
457 << std::endl;
458 std::cerr << "Size: " << maps[0].size() << std::endl << std::endl;
459 }
460
461 if (!method || method == 2)
462 {
463 /** Option 2.
464 * In different threads, we aggregate independently into different two-level hash tables.
465 * Then merge them together, parallelizing by the first level buckets.
466 * When using hash tables of large sizes (10 million elements or more),
467 * and a large number of threads (8-32), the merge is a bottleneck,
468 * and has a performance advantage of 4 times.
469 */
470
471 std::vector<MapTwoLevel> maps(num_threads);
472
473 Stopwatch watch;
474
475 for (size_t i = 0; i < num_threads; ++i)
476 pool.scheduleOrThrowOnError(std::bind(aggregate2,
477 std::ref(maps[i]),
478 data.begin() + (data.size() * i) / num_threads,
479 data.begin() + (data.size() * (i + 1)) / num_threads));
480
481 pool.wait();
482
483 watch.stop();
484 double time_aggregated = watch.elapsedSeconds();
485 std::cerr
486 << "Aggregated in " << time_aggregated
487 << " (" << n / time_aggregated << " elem/sec.)"
488 << std::endl;
489
490 size_t size_before_merge = 0;
491 std::cerr << "Sizes: ";
492 for (size_t i = 0; i < num_threads; ++i)
493 {
494 std::cerr << (i == 0 ? "" : ", ") << maps[i].size();
495 size_before_merge += maps[i].size();
496 }
497 std::cerr << std::endl;
498
499 watch.restart();
500
501 for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
502 pool.scheduleOrThrowOnError(std::bind(merge2,
503 maps.data(), num_threads, i));
504
505 pool.wait();
506
507 watch.stop();
508 double time_merged = watch.elapsedSeconds();
509 std::cerr
510 << "Merged in " << time_merged
511 << " (" << size_before_merge / time_merged << " elem/sec.)"
512 << std::endl;
513
514 double time_total = time_aggregated + time_merged;
515 std::cerr
516 << "Total in " << time_total
517 << " (" << n / time_total << " elem/sec.)"
518 << std::endl;
519
520 std::cerr << "Size: " << maps[0].size() << std::endl << std::endl;
521 }
522
523 if (!method || method == 22)
524 {
525 std::vector<MapTwoLevel> maps(num_threads);
526
527 Stopwatch watch;
528
529 for (size_t i = 0; i < num_threads; ++i)
530 pool.scheduleOrThrowOnError(std::bind(aggregate22,
531 std::ref(maps[i]),
532 data.begin() + (data.size() * i) / num_threads,
533 data.begin() + (data.size() * (i + 1)) / num_threads));
534
535 pool.wait();
536
537 watch.stop();
538 double time_aggregated = watch.elapsedSeconds();
539 std::cerr
540 << "Aggregated in " << time_aggregated
541 << " (" << n / time_aggregated << " elem/sec.)"
542 << std::endl;
543
544 size_t size_before_merge = 0;
545 std::cerr << "Sizes: ";
546 for (size_t i = 0; i < num_threads; ++i)
547 {
548 std::cerr << (i == 0 ? "" : ", ") << maps[i].size();
549 size_before_merge += maps[i].size();
550 }
551 std::cerr << std::endl;
552
553 watch.restart();
554
555 for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
556 pool.scheduleOrThrowOnError(std::bind(merge2, maps.data(), num_threads, i));
557
558 pool.wait();
559
560 watch.stop();
561 double time_merged = watch.elapsedSeconds();
562 std::cerr
563 << "Merged in " << time_merged
564 << " (" << size_before_merge / time_merged << " elem/sec.)"
565 << std::endl;
566
567 double time_total = time_aggregated + time_merged;
568 std::cerr
569 << "Total in " << time_total
570 << " (" << n / time_total << " elem/sec.)"
571 << std::endl;
572
573 std::cerr << "Size: " << maps[0].size() << std::endl << std::endl;
574 }
575
576 if (!method || method == 3)
577 {
578 /** Option 3.
579 * In different threads, we aggregate independently into different hash tables,
580 * until their size becomes large enough.
581 * If the size of the local hash table is large, and there is no element in it,
582 * then we insert it into one global hash table, protected by mutex,
583 * and if mutex failed to capture, then insert it into the local one.
584 * Then merge all the local hash tables to the global one.
585 * This method is bad - a lot of contention.
586 */
587
588 std::vector<Map> local_maps(num_threads);
589 Map global_map;
590 Mutex mutex;
591
592 Stopwatch watch;
593
594 for (size_t i = 0; i < num_threads; ++i)
595 pool.scheduleOrThrowOnError(std::bind(aggregate3,
596 std::ref(local_maps[i]),
597 std::ref(global_map),
598 std::ref(mutex),
599 data.begin() + (data.size() * i) / num_threads,
600 data.begin() + (data.size() * (i + 1)) / num_threads));
601
602 pool.wait();
603
604 watch.stop();
605 double time_aggregated = watch.elapsedSeconds();
606 std::cerr
607 << "Aggregated in " << time_aggregated
608 << " (" << n / time_aggregated << " elem/sec.)"
609 << std::endl;
610
611 size_t size_before_merge = 0;
612 std::cerr << "Sizes (local): ";
613 for (size_t i = 0; i < num_threads; ++i)
614 {
615 std::cerr << (i == 0 ? "" : ", ") << local_maps[i].size();
616 size_before_merge += local_maps[i].size();
617 }
618 std::cerr << std::endl;
619 std::cerr << "Size (global): " << global_map.size() << std::endl;
620 size_before_merge += global_map.size();
621
622 watch.restart();
623
624 for (size_t i = 0; i < num_threads; ++i)
625 for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it)
626 global_map[it->getKey()] += it->getMapped();
627
628 pool.wait();
629
630 watch.stop();
631 double time_merged = watch.elapsedSeconds();
632 std::cerr
633 << "Merged in " << time_merged
634 << " (" << size_before_merge / time_merged << " elem/sec.)"
635 << std::endl;
636
637 double time_total = time_aggregated + time_merged;
638 std::cerr
639 << "Total in " << time_total
640 << " (" << n / time_total << " elem/sec.)"
641 << std::endl;
642
643 std::cerr << "Size: " << global_map.size() << std::endl << std::endl;
644 }
645
646 if (!method || method == 33)
647 {
648 /** Option 33.
649 * In different threads, we aggregate independently into different hash tables,
650 * until their size becomes large enough.
651 * Then we insert the data to the global hash table, protected by mutex, and continue.
652 */
653
654 std::vector<Map> local_maps(num_threads);
655 Map global_map;
656 Mutex mutex;
657
658 Stopwatch watch;
659
660 for (size_t i = 0; i < num_threads; ++i)
661 pool.scheduleOrThrowOnError(std::bind(aggregate33,
662 std::ref(local_maps[i]),
663 std::ref(global_map),
664 std::ref(mutex),
665 data.begin() + (data.size() * i) / num_threads,
666 data.begin() + (data.size() * (i + 1)) / num_threads));
667
668 pool.wait();
669
670 watch.stop();
671 double time_aggregated = watch.elapsedSeconds();
672 std::cerr
673 << "Aggregated in " << time_aggregated
674 << " (" << n / time_aggregated << " elem/sec.)"
675 << std::endl;
676
677 size_t size_before_merge = 0;
678 std::cerr << "Sizes (local): ";
679 for (size_t i = 0; i < num_threads; ++i)
680 {
681 std::cerr << (i == 0 ? "" : ", ") << local_maps[i].size();
682 size_before_merge += local_maps[i].size();
683 }
684 std::cerr << std::endl;
685 std::cerr << "Size (global): " << global_map.size() << std::endl;
686 size_before_merge += global_map.size();
687
688 watch.restart();
689
690 for (size_t i = 0; i < num_threads; ++i)
691 for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it)
692 global_map[it->getKey()] += it->getMapped();
693
694 pool.wait();
695
696 watch.stop();
697 double time_merged = watch.elapsedSeconds();
698 std::cerr
699 << "Merged in " << time_merged
700 << " (" << size_before_merge / time_merged << " elem/sec.)"
701 << std::endl;
702
703 double time_total = time_aggregated + time_merged;
704 std::cerr
705 << "Total in " << time_total
706 << " (" << n / time_total << " elem/sec.)"
707 << std::endl;
708
709 std::cerr << "Size: " << global_map.size() << std::endl << std::endl;
710 }
711
712 if (!method || method == 4)
713 {
714 /** Option 4.
715 * In different threads, we aggregate independently into different hash tables,
716 * until their size becomes large enough.
717 * If the size of the local hash table is large, and there is no element in it,
718 * then insert it into one of 256 global hash tables, each of which is under its mutex.
719 * Then merge all local hash tables into the global one.
720 * This method is not so bad with a lot of threads, but worse than the second one.
721 */
722
723 std::vector<Map> local_maps(num_threads);
724 MapTwoLevel global_map;
725 std::vector<Mutex> mutexes(MapTwoLevel::NUM_BUCKETS);
726
727 Stopwatch watch;
728
729 for (size_t i = 0; i < num_threads; ++i)
730 pool.scheduleOrThrowOnError(std::bind(aggregate4,
731 std::ref(local_maps[i]),
732 std::ref(global_map),
733 mutexes.data(),
734 data.begin() + (data.size() * i) / num_threads,
735 data.begin() + (data.size() * (i + 1)) / num_threads));
736
737 pool.wait();
738
739 watch.stop();
740 double time_aggregated = watch.elapsedSeconds();
741 std::cerr
742 << "Aggregated in " << time_aggregated
743 << " (" << n / time_aggregated << " elem/sec.)"
744 << std::endl;
745
746 size_t size_before_merge = 0;
747 std::cerr << "Sizes (local): ";
748 for (size_t i = 0; i < num_threads; ++i)
749 {
750 std::cerr << (i == 0 ? "" : ", ") << local_maps[i].size();
751 size_before_merge += local_maps[i].size();
752 }
753 std::cerr << std::endl;
754
755 size_t sum_size = global_map.size();
756 std::cerr << "Size (global): " << sum_size << std::endl;
757 size_before_merge += sum_size;
758
759 watch.restart();
760
761 for (size_t i = 0; i < num_threads; ++i)
762 for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it)
763 global_map[it->getKey()] += it->getMapped();
764
765 pool.wait();
766
767 watch.stop();
768 double time_merged = watch.elapsedSeconds();
769 std::cerr
770 << "Merged in " << time_merged
771 << " (" << size_before_merge / time_merged << " elem/sec.)"
772 << std::endl;
773
774 double time_total = time_aggregated + time_merged;
775 std::cerr
776 << "Total in " << time_total
777 << " (" << n / time_total << " elem/sec.)"
778 << std::endl;
779
780 std::cerr << "Size: " << global_map.size() << std::endl << std::endl;
781 }
782
783/* if (!method || method == 5)
784 {
785 */ /** Option 5.
786 * In different threads, we aggregate independently into different hash tables,
787 * until their size becomes large enough.
788 * If the size of the local hash table is large and there is no element in it,
789 * then insert it into one global hash table containing small latches in each cell,
790 * and if the latch can not be captured, then insert it into the local one.
791 * Then merge all local hash tables into the global one.
792 */
793/*
794 Map local_maps[num_threads];
795 MapSmallLocks global_map;
796
797 Stopwatch watch;
798
799 for (size_t i = 0; i < num_threads; ++i)
800 pool.scheduleOrThrowOnError(std::bind(aggregate5,
801 std::ref(local_maps[i]),
802 std::ref(global_map),
803 data.begin() + (data.size() * i) / num_threads,
804 data.begin() + (data.size() * (i + 1)) / num_threads));
805
806 pool.wait();
807
808 watch.stop();
809 double time_aggregated = watch.elapsedSeconds();
810 std::cerr
811 << "Aggregated in " << time_aggregated
812 << " (" << n / time_aggregated << " elem/sec.)"
813 << std::endl;
814
815 size_t size_before_merge = 0;
816 std::cerr << "Sizes (local): ";
817 for (size_t i = 0; i < num_threads; ++i)
818 {
819 std::cerr << (i == 0 ? "" : ", ") << local_maps[i].size();
820 size_before_merge += local_maps[i].size();
821 }
822 std::cerr << std::endl;
823 std::cerr << "Size (global): " << global_map.size() << std::endl;
824 size_before_merge += global_map.size();
825
826 watch.restart();
827
828 for (size_t i = 0; i < num_threads; ++i)
829 for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it)
830 global_map.insert(std::make_pair(it->first, 0)).first->second += it->second;
831
832 pool.wait();
833
834 watch.stop();
835 double time_merged = watch.elapsedSeconds();
836 std::cerr
837 << "Merged in " << time_merged
838 << " (" << size_before_merge / time_merged << " elem/sec.)"
839 << std::endl;
840
841 double time_total = time_aggregated + time_merged;
842 std::cerr
843 << "Total in " << time_total
844 << " (" << n / time_total << " elem/sec.)"
845 << std::endl;
846
847 std::cerr << "Size: " << global_map.size() << std::endl << std::endl;
848 }*/
849
850 /*if (!method || method == 6)
851 {
852 *//** Option 6.
853 * In different threads, we aggregate independently into different hash tables.
854 * Then "merge" them, passing them in the same order of the keys.
855 * Quite a slow option.
856 */
857/*
858 std::vector<Map> maps(num_threads);
859
860 Stopwatch watch;
861
862 for (size_t i = 0; i < num_threads; ++i)
863 pool.scheduleOrThrowOnError(std::bind(aggregate1,
864 std::ref(maps[i]),
865 data.begin() + (data.size() * i) / num_threads,
866 data.begin() + (data.size() * (i + 1)) / num_threads));
867
868 pool.wait();
869
870 watch.stop();
871 double time_aggregated = watch.elapsedSeconds();
872 std::cerr
873 << "Aggregated in " << time_aggregated
874 << " (" << n / time_aggregated << " elem/sec.)"
875 << std::endl;
876
877 size_t size_before_merge = 0;
878 std::cerr << "Sizes: ";
879 for (size_t i = 0; i < num_threads; ++i)
880 {
881 std::cerr << (i == 0 ? "" : ", ") << maps[i].size();
882 size_before_merge += maps[i].size();
883 }
884 std::cerr << std::endl;
885
886 watch.restart();
887
888 using Maps = std::vector<Map *>;
889 Maps maps_to_merge(num_threads);
890 for (size_t i = 0; i < num_threads; ++i)
891 maps_to_merge[i] = &maps[i];
892
893 size_t size = 0;
894
895 for (size_t i = 0; i < 100; ++i)
896 processMergedHashTables(maps_to_merge,
897 [] (Map::value_type & dst, const Map::value_type & src) { dst.second += src.second; },
898 [&] (const Map::value_type & dst) { ++size; });
899
900 watch.stop();
901 double time_merged = watch.elapsedSeconds();
902 std::cerr
903 << "Merged in " << time_merged
904 << " (" << size_before_merge / time_merged << " elem/sec.)"
905 << std::endl;
906
907 double time_total = time_aggregated + time_merged;
908 std::cerr
909 << "Total in " << time_total
910 << " (" << n / time_total << " elem/sec.)"
911 << std::endl;
912 std::cerr << "Size: " << size << std::endl << std::endl;
913 }*/
914
915 return 0;
916}
917