1 | #include <iostream> |
2 | #include <iomanip> |
3 | #include <mutex> |
4 | #include <atomic> |
5 | |
6 | //#define DBMS_HASH_MAP_DEBUG_RESIZES |
7 | |
8 | #include <Interpreters/AggregationCommon.h> |
9 | |
10 | #include <Common/HashTable/HashMap.h> |
11 | #include <Common/HashTable/TwoLevelHashMap.h> |
12 | //#include <Common/HashTable/HashTableWithSmallLocks.h> |
13 | //#include <Common/HashTable/HashTableMerge.h> |
14 | |
15 | #include <IO/ReadBufferFromFile.h> |
16 | #include <Compression/CompressedReadBuffer.h> |
17 | |
18 | #include <Common/Stopwatch.h> |
19 | #include <Common/ThreadPool.h> |
20 | |
21 | |
22 | using Key = UInt64; |
23 | using Value = UInt64; |
24 | |
25 | using Source = std::vector<Key>; |
26 | |
27 | using Map = HashMap<Key, Value>; |
28 | using MapTwoLevel = TwoLevelHashMap<Key, Value>; |
29 | |
30 | |
31 | struct SmallLock |
32 | { |
33 | std::atomic<int> locked {false}; |
34 | |
35 | bool try_lock() |
36 | { |
37 | int expected = 0; |
38 | return locked.compare_exchange_strong(expected, 1, std::memory_order_acquire); |
39 | } |
40 | |
41 | void unlock() |
42 | { |
43 | locked.store(0, std::memory_order_release); |
44 | } |
45 | }; |
46 | |
47 | struct __attribute__((__aligned__(64))) AlignedSmallLock : public SmallLock |
48 | { |
49 | char dummy[64 - sizeof(SmallLock)]; |
50 | }; |
51 | |
52 | |
53 | using Mutex = std::mutex; |
54 | |
55 | |
56 | /*using MapSmallLocks = HashTableWithSmallLocks< |
57 | Key, |
58 | HashTableCellWithLock< |
59 | Key, |
60 | HashMapCell<Key, Value, DefaultHash<Key>> >, |
61 | DefaultHash<Key>, |
62 | HashTableGrower<21>, |
63 | HashTableAllocator>;*/ |
64 | |
65 | |
66 | static void aggregate1(Map & map, Source::const_iterator begin, Source::const_iterator end) |
67 | { |
68 | for (auto it = begin; it != end; ++it) |
69 | ++map[*it]; |
70 | } |
71 | |
72 | #if !__clang__ |
73 | #pragma GCC diagnostic push |
74 | #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" |
75 | #endif |
76 | |
77 | static void aggregate12(Map & map, Source::const_iterator begin, Source::const_iterator end) |
78 | { |
79 | Map::LookupResult found = nullptr; |
80 | auto prev_it = end; |
81 | for (auto it = begin; it != end; ++it) |
82 | { |
83 | if (prev_it != end && *it == *prev_it) |
84 | { |
85 | ++found->getMapped(); |
86 | continue; |
87 | } |
88 | prev_it = it; |
89 | |
90 | bool inserted; |
91 | map.emplace(*it, found, inserted); |
92 | ++found->getMapped(); |
93 | } |
94 | } |
95 | |
96 | static void aggregate2(MapTwoLevel & map, Source::const_iterator begin, Source::const_iterator end) |
97 | { |
98 | for (auto it = begin; it != end; ++it) |
99 | ++map[*it]; |
100 | } |
101 | |
102 | static void aggregate22(MapTwoLevel & map, Source::const_iterator begin, Source::const_iterator end) |
103 | { |
104 | MapTwoLevel::LookupResult found = nullptr; |
105 | auto prev_it = end; |
106 | for (auto it = begin; it != end; ++it) |
107 | { |
108 | if (*it == *prev_it) |
109 | { |
110 | ++found->getMapped(); |
111 | continue; |
112 | } |
113 | prev_it = it; |
114 | |
115 | bool inserted; |
116 | map.emplace(*it, found, inserted); |
117 | ++found->getMapped(); |
118 | } |
119 | } |
120 | |
121 | #if !__clang__ |
122 | #pragma GCC diagnostic pop |
123 | #endif |
124 | |
125 | static void merge2(MapTwoLevel * maps, size_t num_threads, size_t bucket) |
126 | { |
127 | for (size_t i = 1; i < num_threads; ++i) |
128 | for (auto it = maps[i].impls[bucket].begin(); it != maps[i].impls[bucket].end(); ++it) |
129 | maps[0].impls[bucket][it->getKey()] += it->getMapped(); |
130 | } |
131 | |
132 | static void aggregate3(Map & local_map, Map & global_map, Mutex & mutex, Source::const_iterator begin, Source::const_iterator end) |
133 | { |
134 | static constexpr size_t threshold = 65536; |
135 | |
136 | for (auto it = begin; it != end; ++it) |
137 | { |
138 | auto found = local_map.find(*it); |
139 | |
140 | if (found) |
141 | ++found->getMapped(); |
142 | else if (local_map.size() < threshold) |
143 | ++local_map[*it]; /// TODO You could do one lookup, not two. |
144 | else |
145 | { |
146 | if (mutex.try_lock()) |
147 | { |
148 | ++global_map[*it]; |
149 | mutex.unlock(); |
150 | } |
151 | else |
152 | ++local_map[*it]; |
153 | } |
154 | } |
155 | } |
156 | |
157 | static void aggregate33(Map & local_map, Map & global_map, Mutex & mutex, Source::const_iterator begin, Source::const_iterator end) |
158 | { |
159 | static constexpr size_t threshold = 65536; |
160 | |
161 | for (auto it = begin; it != end; ++it) |
162 | { |
163 | Map::LookupResult found; |
164 | bool inserted; |
165 | local_map.emplace(*it, found, inserted); |
166 | ++found->getMapped(); |
167 | |
168 | if (inserted && local_map.size() == threshold) |
169 | { |
170 | std::lock_guard<Mutex> lock(mutex); |
171 | for (auto & value_type : local_map) |
172 | global_map[value_type.getKey()] += value_type.getMapped(); |
173 | |
174 | local_map.clear(); |
175 | } |
176 | } |
177 | } |
178 | |
179 | static void aggregate4(Map & local_map, MapTwoLevel & global_map, Mutex * mutexes, Source::const_iterator begin, Source::const_iterator end) |
180 | { |
181 | static constexpr size_t threshold = 65536; |
182 | static constexpr size_t block_size = 8192; |
183 | |
184 | auto it = begin; |
185 | while (it != end) |
186 | { |
187 | auto block_end = std::min(end, it + block_size); |
188 | |
189 | if (local_map.size() < threshold) |
190 | { |
191 | for (; it != block_end; ++it) |
192 | ++local_map[*it]; |
193 | } |
194 | else |
195 | { |
196 | for (; it != block_end; ++it) |
197 | { |
198 | auto found = local_map.find(*it); |
199 | |
200 | if (found) |
201 | ++found->getMapped(); |
202 | else |
203 | { |
204 | size_t hash_value = global_map.hash(*it); |
205 | size_t bucket = global_map.getBucketFromHash(hash_value); |
206 | |
207 | if (mutexes[bucket].try_lock()) |
208 | { |
209 | ++global_map.impls[bucket][*it]; |
210 | mutexes[bucket].unlock(); |
211 | } |
212 | else |
213 | ++local_map[*it]; |
214 | } |
215 | } |
216 | } |
217 | } |
218 | } |
219 | /* |
220 | void aggregate5(Map & local_map, MapSmallLocks & global_map, Source::const_iterator begin, Source::const_iterator end) |
221 | { |
222 | static constexpr size_t threshold = 65536; |
223 | |
224 | for (auto it = begin; it != end; ++it) |
225 | { |
226 | Map::iterator found = local_map.find(*it); |
227 | |
228 | if (found != local_map.end()) |
229 | ++found->second; |
230 | else if (local_map.size() < threshold) |
231 | ++local_map[*it]; /// TODO You could do one lookup, not two. |
232 | else |
233 | { |
234 | SmallScopedLock lock; |
235 | MapSmallLocks::iterator insert_it; |
236 | bool inserted; |
237 | |
238 | if (global_map.tryEmplace(*it, insert_it, inserted, lock)) |
239 | ++insert_it->second; |
240 | else |
241 | ++local_map[*it]; |
242 | } |
243 | } |
244 | }*/ |
245 | |
246 | |
247 | |
248 | int main(int argc, char ** argv) |
249 | { |
250 | size_t n = atoi(argv[1]); |
251 | size_t num_threads = atoi(argv[2]); |
252 | size_t method = argc <= 3 ? 0 : atoi(argv[3]); |
253 | |
254 | std::cerr << std::fixed << std::setprecision(2); |
255 | |
256 | ThreadPool pool(num_threads); |
257 | |
258 | Source data(n); |
259 | |
260 | { |
261 | Stopwatch watch; |
262 | DB::ReadBufferFromFileDescriptor in1(STDIN_FILENO); |
263 | DB::CompressedReadBuffer in2(in1); |
264 | |
265 | in2.readStrict(reinterpret_cast<char*>(data.data()), sizeof(data[0]) * n); |
266 | |
267 | watch.stop(); |
268 | std::cerr << std::fixed << std::setprecision(2) |
269 | << "Vector. Size: " << n |
270 | << ", elapsed: " << watch.elapsedSeconds() |
271 | << " (" << n / watch.elapsedSeconds() << " elem/sec.)" |
272 | << std::endl << std::endl; |
273 | } |
274 | |
275 | if (!method || method == 1) |
276 | { |
277 | /** Option 1. |
278 | * In different threads, we aggregate independently into different hash tables. |
279 | * Then merge them together. |
280 | */ |
281 | |
282 | std::vector<Map> maps(num_threads); |
283 | |
284 | Stopwatch watch; |
285 | |
286 | for (size_t i = 0; i < num_threads; ++i) |
287 | pool.scheduleOrThrowOnError(std::bind(aggregate1, |
288 | std::ref(maps[i]), |
289 | data.begin() + (data.size() * i) / num_threads, |
290 | data.begin() + (data.size() * (i + 1)) / num_threads)); |
291 | |
292 | pool.wait(); |
293 | |
294 | watch.stop(); |
295 | double time_aggregated = watch.elapsedSeconds(); |
296 | std::cerr |
297 | << "Aggregated in " << time_aggregated |
298 | << " (" << n / time_aggregated << " elem/sec.)" |
299 | << std::endl; |
300 | |
301 | size_t size_before_merge = 0; |
302 | std::cerr << "Sizes: " ; |
303 | for (size_t i = 0; i < num_threads; ++i) |
304 | { |
305 | std::cerr << (i == 0 ? "" : ", " ) << maps[i].size(); |
306 | size_before_merge += maps[i].size(); |
307 | } |
308 | std::cerr << std::endl; |
309 | |
310 | watch.restart(); |
311 | |
312 | for (size_t i = 1; i < num_threads; ++i) |
313 | for (auto it = maps[i].begin(); it != maps[i].end(); ++it) |
314 | maps[0][it->getKey()] += it->getMapped(); |
315 | |
316 | watch.stop(); |
317 | double time_merged = watch.elapsedSeconds(); |
318 | std::cerr |
319 | << "Merged in " << time_merged |
320 | << " (" << size_before_merge / time_merged << " elem/sec.)" |
321 | << std::endl; |
322 | |
323 | double time_total = time_aggregated + time_merged; |
324 | std::cerr |
325 | << "Total in " << time_total |
326 | << " (" << n / time_total << " elem/sec.)" |
327 | << std::endl; |
328 | std::cerr << "Size: " << maps[0].size() << std::endl << std::endl; |
329 | } |
330 | |
331 | if (!method || method == 12) |
332 | { |
333 | /** The same, but with optimization for consecutive identical values. |
334 | */ |
335 | |
336 | std::vector<Map> maps(num_threads); |
337 | |
338 | Stopwatch watch; |
339 | |
340 | for (size_t i = 0; i < num_threads; ++i) |
341 | pool.scheduleOrThrowOnError(std::bind(aggregate12, |
342 | std::ref(maps[i]), |
343 | data.begin() + (data.size() * i) / num_threads, |
344 | data.begin() + (data.size() * (i + 1)) / num_threads)); |
345 | |
346 | pool.wait(); |
347 | |
348 | watch.stop(); |
349 | double time_aggregated = watch.elapsedSeconds(); |
350 | std::cerr |
351 | << "Aggregated in " << time_aggregated |
352 | << " (" << n / time_aggregated << " elem/sec.)" |
353 | << std::endl; |
354 | |
355 | size_t size_before_merge = 0; |
356 | std::cerr << "Sizes: " ; |
357 | for (size_t i = 0; i < num_threads; ++i) |
358 | { |
359 | std::cerr << (i == 0 ? "" : ", " ) << maps[i].size(); |
360 | size_before_merge += maps[i].size(); |
361 | } |
362 | std::cerr << std::endl; |
363 | |
364 | watch.restart(); |
365 | |
366 | for (size_t i = 1; i < num_threads; ++i) |
367 | for (auto it = maps[i].begin(); it != maps[i].end(); ++it) |
368 | maps[0][it->getKey()] += it->getMapped(); |
369 | |
370 | watch.stop(); |
371 | |
372 | double time_merged = watch.elapsedSeconds(); |
373 | std::cerr |
374 | << "Merged in " << time_merged |
375 | << " (" << size_before_merge / time_merged << " elem/sec.)" |
376 | << std::endl; |
377 | |
378 | double time_total = time_aggregated + time_merged; |
379 | std::cerr |
380 | << "Total in " << time_total |
381 | << " (" << n / time_total << " elem/sec.)" |
382 | << std::endl; |
383 | std::cerr << "Size: " << maps[0].size() << std::endl << std::endl; |
384 | } |
385 | |
386 | if (!method || method == 11) |
387 | { |
388 | /** Option 11. |
389 | * Same as option 1, but with merge, the order of the cycles is changed, |
390 | * which potentially can give better cache locality. |
391 | * |
392 | * In practice, there is no difference. |
393 | */ |
394 | |
395 | std::vector<Map> maps(num_threads); |
396 | |
397 | Stopwatch watch; |
398 | |
399 | for (size_t i = 0; i < num_threads; ++i) |
400 | pool.scheduleOrThrowOnError(std::bind(aggregate1, |
401 | std::ref(maps[i]), |
402 | data.begin() + (data.size() * i) / num_threads, |
403 | data.begin() + (data.size() * (i + 1)) / num_threads)); |
404 | |
405 | pool.wait(); |
406 | |
407 | watch.stop(); |
408 | double time_aggregated = watch.elapsedSeconds(); |
409 | std::cerr |
410 | << "Aggregated in " << time_aggregated |
411 | << " (" << n / time_aggregated << " elem/sec.)" |
412 | << std::endl; |
413 | |
414 | size_t size_before_merge = 0; |
415 | std::cerr << "Sizes: " ; |
416 | for (size_t i = 0; i < num_threads; ++i) |
417 | { |
418 | std::cerr << (i == 0 ? "" : ", " ) << maps[i].size(); |
419 | size_before_merge += maps[i].size(); |
420 | } |
421 | std::cerr << std::endl; |
422 | |
423 | watch.restart(); |
424 | |
425 | std::vector<Map::iterator> iterators(num_threads); |
426 | for (size_t i = 1; i < num_threads; ++i) |
427 | iterators[i] = maps[i].begin(); |
428 | |
429 | while (true) |
430 | { |
431 | bool finish = true; |
432 | for (size_t i = 1; i < num_threads; ++i) |
433 | { |
434 | if (iterators[i] == maps[i].end()) |
435 | continue; |
436 | |
437 | finish = false; |
438 | maps[0][iterators[i]->getKey()] += iterators[i]->getMapped(); |
439 | ++iterators[i]; |
440 | } |
441 | |
442 | if (finish) |
443 | break; |
444 | } |
445 | |
446 | watch.stop(); |
447 | double time_merged = watch.elapsedSeconds(); |
448 | std::cerr |
449 | << "Merged in " << time_merged |
450 | << " (" << size_before_merge / time_merged << " elem/sec.)" |
451 | << std::endl; |
452 | |
453 | double time_total = time_aggregated + time_merged; |
454 | std::cerr |
455 | << "Total in " << time_total |
456 | << " (" << n / time_total << " elem/sec.)" |
457 | << std::endl; |
458 | std::cerr << "Size: " << maps[0].size() << std::endl << std::endl; |
459 | } |
460 | |
461 | if (!method || method == 2) |
462 | { |
463 | /** Option 2. |
464 | * In different threads, we aggregate independently into different two-level hash tables. |
465 | * Then merge them together, parallelizing by the first level buckets. |
466 | * When using hash tables of large sizes (10 million elements or more), |
467 | * and a large number of threads (8-32), the merge is a bottleneck, |
468 | * and has a performance advantage of 4 times. |
469 | */ |
470 | |
471 | std::vector<MapTwoLevel> maps(num_threads); |
472 | |
473 | Stopwatch watch; |
474 | |
475 | for (size_t i = 0; i < num_threads; ++i) |
476 | pool.scheduleOrThrowOnError(std::bind(aggregate2, |
477 | std::ref(maps[i]), |
478 | data.begin() + (data.size() * i) / num_threads, |
479 | data.begin() + (data.size() * (i + 1)) / num_threads)); |
480 | |
481 | pool.wait(); |
482 | |
483 | watch.stop(); |
484 | double time_aggregated = watch.elapsedSeconds(); |
485 | std::cerr |
486 | << "Aggregated in " << time_aggregated |
487 | << " (" << n / time_aggregated << " elem/sec.)" |
488 | << std::endl; |
489 | |
490 | size_t size_before_merge = 0; |
491 | std::cerr << "Sizes: " ; |
492 | for (size_t i = 0; i < num_threads; ++i) |
493 | { |
494 | std::cerr << (i == 0 ? "" : ", " ) << maps[i].size(); |
495 | size_before_merge += maps[i].size(); |
496 | } |
497 | std::cerr << std::endl; |
498 | |
499 | watch.restart(); |
500 | |
501 | for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i) |
502 | pool.scheduleOrThrowOnError(std::bind(merge2, |
503 | maps.data(), num_threads, i)); |
504 | |
505 | pool.wait(); |
506 | |
507 | watch.stop(); |
508 | double time_merged = watch.elapsedSeconds(); |
509 | std::cerr |
510 | << "Merged in " << time_merged |
511 | << " (" << size_before_merge / time_merged << " elem/sec.)" |
512 | << std::endl; |
513 | |
514 | double time_total = time_aggregated + time_merged; |
515 | std::cerr |
516 | << "Total in " << time_total |
517 | << " (" << n / time_total << " elem/sec.)" |
518 | << std::endl; |
519 | |
520 | std::cerr << "Size: " << maps[0].size() << std::endl << std::endl; |
521 | } |
522 | |
523 | if (!method || method == 22) |
524 | { |
525 | std::vector<MapTwoLevel> maps(num_threads); |
526 | |
527 | Stopwatch watch; |
528 | |
529 | for (size_t i = 0; i < num_threads; ++i) |
530 | pool.scheduleOrThrowOnError(std::bind(aggregate22, |
531 | std::ref(maps[i]), |
532 | data.begin() + (data.size() * i) / num_threads, |
533 | data.begin() + (data.size() * (i + 1)) / num_threads)); |
534 | |
535 | pool.wait(); |
536 | |
537 | watch.stop(); |
538 | double time_aggregated = watch.elapsedSeconds(); |
539 | std::cerr |
540 | << "Aggregated in " << time_aggregated |
541 | << " (" << n / time_aggregated << " elem/sec.)" |
542 | << std::endl; |
543 | |
544 | size_t size_before_merge = 0; |
545 | std::cerr << "Sizes: " ; |
546 | for (size_t i = 0; i < num_threads; ++i) |
547 | { |
548 | std::cerr << (i == 0 ? "" : ", " ) << maps[i].size(); |
549 | size_before_merge += maps[i].size(); |
550 | } |
551 | std::cerr << std::endl; |
552 | |
553 | watch.restart(); |
554 | |
555 | for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i) |
556 | pool.scheduleOrThrowOnError(std::bind(merge2, maps.data(), num_threads, i)); |
557 | |
558 | pool.wait(); |
559 | |
560 | watch.stop(); |
561 | double time_merged = watch.elapsedSeconds(); |
562 | std::cerr |
563 | << "Merged in " << time_merged |
564 | << " (" << size_before_merge / time_merged << " elem/sec.)" |
565 | << std::endl; |
566 | |
567 | double time_total = time_aggregated + time_merged; |
568 | std::cerr |
569 | << "Total in " << time_total |
570 | << " (" << n / time_total << " elem/sec.)" |
571 | << std::endl; |
572 | |
573 | std::cerr << "Size: " << maps[0].size() << std::endl << std::endl; |
574 | } |
575 | |
576 | if (!method || method == 3) |
577 | { |
578 | /** Option 3. |
579 | * In different threads, we aggregate independently into different hash tables, |
580 | * until their size becomes large enough. |
581 | * If the size of the local hash table is large, and there is no element in it, |
582 | * then we insert it into one global hash table, protected by mutex, |
583 | * and if mutex failed to capture, then insert it into the local one. |
584 | * Then merge all the local hash tables to the global one. |
585 | * This method is bad - a lot of contention. |
586 | */ |
587 | |
588 | std::vector<Map> local_maps(num_threads); |
589 | Map global_map; |
590 | Mutex mutex; |
591 | |
592 | Stopwatch watch; |
593 | |
594 | for (size_t i = 0; i < num_threads; ++i) |
595 | pool.scheduleOrThrowOnError(std::bind(aggregate3, |
596 | std::ref(local_maps[i]), |
597 | std::ref(global_map), |
598 | std::ref(mutex), |
599 | data.begin() + (data.size() * i) / num_threads, |
600 | data.begin() + (data.size() * (i + 1)) / num_threads)); |
601 | |
602 | pool.wait(); |
603 | |
604 | watch.stop(); |
605 | double time_aggregated = watch.elapsedSeconds(); |
606 | std::cerr |
607 | << "Aggregated in " << time_aggregated |
608 | << " (" << n / time_aggregated << " elem/sec.)" |
609 | << std::endl; |
610 | |
611 | size_t size_before_merge = 0; |
612 | std::cerr << "Sizes (local): " ; |
613 | for (size_t i = 0; i < num_threads; ++i) |
614 | { |
615 | std::cerr << (i == 0 ? "" : ", " ) << local_maps[i].size(); |
616 | size_before_merge += local_maps[i].size(); |
617 | } |
618 | std::cerr << std::endl; |
619 | std::cerr << "Size (global): " << global_map.size() << std::endl; |
620 | size_before_merge += global_map.size(); |
621 | |
622 | watch.restart(); |
623 | |
624 | for (size_t i = 0; i < num_threads; ++i) |
625 | for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it) |
626 | global_map[it->getKey()] += it->getMapped(); |
627 | |
628 | pool.wait(); |
629 | |
630 | watch.stop(); |
631 | double time_merged = watch.elapsedSeconds(); |
632 | std::cerr |
633 | << "Merged in " << time_merged |
634 | << " (" << size_before_merge / time_merged << " elem/sec.)" |
635 | << std::endl; |
636 | |
637 | double time_total = time_aggregated + time_merged; |
638 | std::cerr |
639 | << "Total in " << time_total |
640 | << " (" << n / time_total << " elem/sec.)" |
641 | << std::endl; |
642 | |
643 | std::cerr << "Size: " << global_map.size() << std::endl << std::endl; |
644 | } |
645 | |
646 | if (!method || method == 33) |
647 | { |
648 | /** Option 33. |
649 | * In different threads, we aggregate independently into different hash tables, |
650 | * until their size becomes large enough. |
651 | * Then we insert the data to the global hash table, protected by mutex, and continue. |
652 | */ |
653 | |
654 | std::vector<Map> local_maps(num_threads); |
655 | Map global_map; |
656 | Mutex mutex; |
657 | |
658 | Stopwatch watch; |
659 | |
660 | for (size_t i = 0; i < num_threads; ++i) |
661 | pool.scheduleOrThrowOnError(std::bind(aggregate33, |
662 | std::ref(local_maps[i]), |
663 | std::ref(global_map), |
664 | std::ref(mutex), |
665 | data.begin() + (data.size() * i) / num_threads, |
666 | data.begin() + (data.size() * (i + 1)) / num_threads)); |
667 | |
668 | pool.wait(); |
669 | |
670 | watch.stop(); |
671 | double time_aggregated = watch.elapsedSeconds(); |
672 | std::cerr |
673 | << "Aggregated in " << time_aggregated |
674 | << " (" << n / time_aggregated << " elem/sec.)" |
675 | << std::endl; |
676 | |
677 | size_t size_before_merge = 0; |
678 | std::cerr << "Sizes (local): " ; |
679 | for (size_t i = 0; i < num_threads; ++i) |
680 | { |
681 | std::cerr << (i == 0 ? "" : ", " ) << local_maps[i].size(); |
682 | size_before_merge += local_maps[i].size(); |
683 | } |
684 | std::cerr << std::endl; |
685 | std::cerr << "Size (global): " << global_map.size() << std::endl; |
686 | size_before_merge += global_map.size(); |
687 | |
688 | watch.restart(); |
689 | |
690 | for (size_t i = 0; i < num_threads; ++i) |
691 | for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it) |
692 | global_map[it->getKey()] += it->getMapped(); |
693 | |
694 | pool.wait(); |
695 | |
696 | watch.stop(); |
697 | double time_merged = watch.elapsedSeconds(); |
698 | std::cerr |
699 | << "Merged in " << time_merged |
700 | << " (" << size_before_merge / time_merged << " elem/sec.)" |
701 | << std::endl; |
702 | |
703 | double time_total = time_aggregated + time_merged; |
704 | std::cerr |
705 | << "Total in " << time_total |
706 | << " (" << n / time_total << " elem/sec.)" |
707 | << std::endl; |
708 | |
709 | std::cerr << "Size: " << global_map.size() << std::endl << std::endl; |
710 | } |
711 | |
712 | if (!method || method == 4) |
713 | { |
714 | /** Option 4. |
715 | * In different threads, we aggregate independently into different hash tables, |
716 | * until their size becomes large enough. |
717 | * If the size of the local hash table is large, and there is no element in it, |
718 | * then insert it into one of 256 global hash tables, each of which is under its mutex. |
719 | * Then merge all local hash tables into the global one. |
720 | * This method is not so bad with a lot of threads, but worse than the second one. |
721 | */ |
722 | |
723 | std::vector<Map> local_maps(num_threads); |
724 | MapTwoLevel global_map; |
725 | std::vector<Mutex> mutexes(MapTwoLevel::NUM_BUCKETS); |
726 | |
727 | Stopwatch watch; |
728 | |
729 | for (size_t i = 0; i < num_threads; ++i) |
730 | pool.scheduleOrThrowOnError(std::bind(aggregate4, |
731 | std::ref(local_maps[i]), |
732 | std::ref(global_map), |
733 | mutexes.data(), |
734 | data.begin() + (data.size() * i) / num_threads, |
735 | data.begin() + (data.size() * (i + 1)) / num_threads)); |
736 | |
737 | pool.wait(); |
738 | |
739 | watch.stop(); |
740 | double time_aggregated = watch.elapsedSeconds(); |
741 | std::cerr |
742 | << "Aggregated in " << time_aggregated |
743 | << " (" << n / time_aggregated << " elem/sec.)" |
744 | << std::endl; |
745 | |
746 | size_t size_before_merge = 0; |
747 | std::cerr << "Sizes (local): " ; |
748 | for (size_t i = 0; i < num_threads; ++i) |
749 | { |
750 | std::cerr << (i == 0 ? "" : ", " ) << local_maps[i].size(); |
751 | size_before_merge += local_maps[i].size(); |
752 | } |
753 | std::cerr << std::endl; |
754 | |
755 | size_t sum_size = global_map.size(); |
756 | std::cerr << "Size (global): " << sum_size << std::endl; |
757 | size_before_merge += sum_size; |
758 | |
759 | watch.restart(); |
760 | |
761 | for (size_t i = 0; i < num_threads; ++i) |
762 | for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it) |
763 | global_map[it->getKey()] += it->getMapped(); |
764 | |
765 | pool.wait(); |
766 | |
767 | watch.stop(); |
768 | double time_merged = watch.elapsedSeconds(); |
769 | std::cerr |
770 | << "Merged in " << time_merged |
771 | << " (" << size_before_merge / time_merged << " elem/sec.)" |
772 | << std::endl; |
773 | |
774 | double time_total = time_aggregated + time_merged; |
775 | std::cerr |
776 | << "Total in " << time_total |
777 | << " (" << n / time_total << " elem/sec.)" |
778 | << std::endl; |
779 | |
780 | std::cerr << "Size: " << global_map.size() << std::endl << std::endl; |
781 | } |
782 | |
783 | /* if (!method || method == 5) |
784 | { |
785 | */ /** Option 5. |
786 | * In different threads, we aggregate independently into different hash tables, |
787 | * until their size becomes large enough. |
788 | * If the size of the local hash table is large and there is no element in it, |
789 | * then insert it into one global hash table containing small latches in each cell, |
790 | * and if the latch can not be captured, then insert it into the local one. |
791 | * Then merge all local hash tables into the global one. |
792 | */ |
793 | /* |
794 | Map local_maps[num_threads]; |
795 | MapSmallLocks global_map; |
796 | |
797 | Stopwatch watch; |
798 | |
799 | for (size_t i = 0; i < num_threads; ++i) |
800 | pool.scheduleOrThrowOnError(std::bind(aggregate5, |
801 | std::ref(local_maps[i]), |
802 | std::ref(global_map), |
803 | data.begin() + (data.size() * i) / num_threads, |
804 | data.begin() + (data.size() * (i + 1)) / num_threads)); |
805 | |
806 | pool.wait(); |
807 | |
808 | watch.stop(); |
809 | double time_aggregated = watch.elapsedSeconds(); |
810 | std::cerr |
811 | << "Aggregated in " << time_aggregated |
812 | << " (" << n / time_aggregated << " elem/sec.)" |
813 | << std::endl; |
814 | |
815 | size_t size_before_merge = 0; |
816 | std::cerr << "Sizes (local): "; |
817 | for (size_t i = 0; i < num_threads; ++i) |
818 | { |
819 | std::cerr << (i == 0 ? "" : ", ") << local_maps[i].size(); |
820 | size_before_merge += local_maps[i].size(); |
821 | } |
822 | std::cerr << std::endl; |
823 | std::cerr << "Size (global): " << global_map.size() << std::endl; |
824 | size_before_merge += global_map.size(); |
825 | |
826 | watch.restart(); |
827 | |
828 | for (size_t i = 0; i < num_threads; ++i) |
829 | for (auto it = local_maps[i].begin(); it != local_maps[i].end(); ++it) |
830 | global_map.insert(std::make_pair(it->first, 0)).first->second += it->second; |
831 | |
832 | pool.wait(); |
833 | |
834 | watch.stop(); |
835 | double time_merged = watch.elapsedSeconds(); |
836 | std::cerr |
837 | << "Merged in " << time_merged |
838 | << " (" << size_before_merge / time_merged << " elem/sec.)" |
839 | << std::endl; |
840 | |
841 | double time_total = time_aggregated + time_merged; |
842 | std::cerr |
843 | << "Total in " << time_total |
844 | << " (" << n / time_total << " elem/sec.)" |
845 | << std::endl; |
846 | |
847 | std::cerr << "Size: " << global_map.size() << std::endl << std::endl; |
848 | }*/ |
849 | |
850 | /*if (!method || method == 6) |
851 | { |
852 | *//** Option 6. |
853 | * In different threads, we aggregate independently into different hash tables. |
854 | * Then "merge" them, passing them in the same order of the keys. |
855 | * Quite a slow option. |
856 | */ |
857 | /* |
858 | std::vector<Map> maps(num_threads); |
859 | |
860 | Stopwatch watch; |
861 | |
862 | for (size_t i = 0; i < num_threads; ++i) |
863 | pool.scheduleOrThrowOnError(std::bind(aggregate1, |
864 | std::ref(maps[i]), |
865 | data.begin() + (data.size() * i) / num_threads, |
866 | data.begin() + (data.size() * (i + 1)) / num_threads)); |
867 | |
868 | pool.wait(); |
869 | |
870 | watch.stop(); |
871 | double time_aggregated = watch.elapsedSeconds(); |
872 | std::cerr |
873 | << "Aggregated in " << time_aggregated |
874 | << " (" << n / time_aggregated << " elem/sec.)" |
875 | << std::endl; |
876 | |
877 | size_t size_before_merge = 0; |
878 | std::cerr << "Sizes: "; |
879 | for (size_t i = 0; i < num_threads; ++i) |
880 | { |
881 | std::cerr << (i == 0 ? "" : ", ") << maps[i].size(); |
882 | size_before_merge += maps[i].size(); |
883 | } |
884 | std::cerr << std::endl; |
885 | |
886 | watch.restart(); |
887 | |
888 | using Maps = std::vector<Map *>; |
889 | Maps maps_to_merge(num_threads); |
890 | for (size_t i = 0; i < num_threads; ++i) |
891 | maps_to_merge[i] = &maps[i]; |
892 | |
893 | size_t size = 0; |
894 | |
895 | for (size_t i = 0; i < 100; ++i) |
896 | processMergedHashTables(maps_to_merge, |
897 | [] (Map::value_type & dst, const Map::value_type & src) { dst.second += src.second; }, |
898 | [&] (const Map::value_type & dst) { ++size; }); |
899 | |
900 | watch.stop(); |
901 | double time_merged = watch.elapsedSeconds(); |
902 | std::cerr |
903 | << "Merged in " << time_merged |
904 | << " (" << size_before_merge / time_merged << " elem/sec.)" |
905 | << std::endl; |
906 | |
907 | double time_total = time_aggregated + time_merged; |
908 | std::cerr |
909 | << "Total in " << time_total |
910 | << " (" << n / time_total << " elem/sec.)" |
911 | << std::endl; |
912 | std::cerr << "Size: " << size << std::endl << std::endl; |
913 | }*/ |
914 | |
915 | return 0; |
916 | } |
917 | |