1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#if _MSC_VER
18 // Suppress "decorated name length exceeded, name was truncated" warning
19 #pragma warning (disable: 4503)
20#endif
21
22#include "tbb/flow_graph.h"
23#include "tbb/task_scheduler_init.h"
24#include "tbb/tick_count.h"
25#include "tbb/tbb_thread.h"
26#include "tbb/atomic.h"
27#include "tbb/spin_mutex.h"
28#include <iostream>
29#include "../../common/utility/utility.h"
30#include <cstdlib>
31#include <cstdio>
32
33// Each philosopher is an object, and is invoked in the think() function_node, the
34// eat() function_node and forward() multifunction_node.
35//
36// The graph is constructed, and each think() function_node is started with a continue_msg.
37//
38// The philosopher will think, then gather two chopsticks, eat, place the chopsticks back,
39// and if they have not completed the required number of cycles, will start to think() again
40// by sending a continue_msg to their corresponding think() function_node.
41//
42// The reserving join has as its inputs the left and right chopstick queues an a queue
43// that stores the continue_msg emitted by the function_node after think()ing is done.
44// When all three inputs are available, a tuple of the inputs will be forwarded to the
45// eat() function_node. The output of the eat() function_node is sent to the forward()
46// multifunction_node.
47
48const tbb::tick_count::interval_t think_time(1.0);
49const tbb::tick_count::interval_t eat_time(1.0);
50const int num_times = 10;
51
52tbb::tick_count t0;
53bool verbose = false;
54
55const char *names[] = { "Archimedes", "Bakunin", "Confucius", "Democritus", "Euclid"
56 , "Favorinus", "Geminus", "Heraclitus", "Ichthyas", "Jason of Nysa",
57 "Kant", "Lavrov", "Metrocles", "Nausiphanes", "Onatas", "Phaedrus",
58 "Quillot", "Russell", "Socrates", "Thales", "Udayana",
59 "Vernadsky", "Wittgenstein", "Xenophilus", "Yen Yuan", "Zenodotus"
60};
61const int NumPhilosophers = sizeof(names) / sizeof(char*);
62
63struct RunOptions {
64 utility::thread_number_range threads;
65 int number_of_philosophers;
66 bool silent;
67 RunOptions(utility::thread_number_range threads_, int number_of_philosophers_, bool silent_) :
68 threads(threads_), number_of_philosophers(number_of_philosophers_), silent(silent_) { }
69};
70
71RunOptions ParseCommandLine(int argc, char *argv[]) {
72 int auto_threads = tbb::task_scheduler_init::default_num_threads();
73 utility::thread_number_range threads(tbb::task_scheduler_init::default_num_threads, auto_threads, auto_threads);
74 int nPhilosophers = 5;
75 bool verbose = false;
76 char charbuf[100];
77 std::sprintf(charbuf, "%d", NumPhilosophers);
78 std::string pCount = "how many philosophers, from 2-";
79 pCount += charbuf;
80
81 utility::cli_argument_pack cli_pack;
82 cli_pack.positional_arg(threads, "n-of_threads", utility::thread_number_range_desc)
83 .positional_arg(nPhilosophers, "n-of-philosophers", pCount)
84 .arg(verbose,"verbose","verbose output");
85 utility::parse_cli_arguments(argc, argv, cli_pack);
86 if(nPhilosophers < 2 || nPhilosophers > NumPhilosophers) {
87 std::cout << "Number of philosophers (" << nPhilosophers << ") out of range [2:" << NumPhilosophers << "]\n";
88 std::cout << cli_pack.usage_string(argv[0]) << std::flush;
89 std::exit(1);
90 }
91 return RunOptions(threads, nPhilosophers,!verbose);
92}
93
94
95tbb::spin_mutex my_mutex;
96
97class chopstick {};
98
99using namespace tbb::flow;
100
101typedef tbb::flow::tuple<continue_msg, chopstick, chopstick> join_output;
102typedef join_node< join_output, reserving > join_node_type;
103
104typedef function_node<continue_msg, continue_msg> think_node_type;
105typedef function_node<join_output, continue_msg> eat_node_type;
106typedef multifunction_node<continue_msg, join_output> forward_node_type;
107
108class philosopher {
109public:
110
111 philosopher( const char *name ) :
112 my_name(name), my_count(num_times) { }
113
114 ~philosopher() {
115 }
116
117 void check();
118 const char *name() const { return my_name; }
119
120private:
121
122 friend std::ostream& operator<<(std::ostream& o, philosopher const &p);
123
124 const char *my_name;
125 int my_count;
126
127 friend class think_node_body;
128 friend class eat_node_body;
129 friend class forward_node_body;
130
131 void think( );
132 void eat();
133 void forward( const continue_msg &in, forward_node_type::output_ports_type &out_ports );
134};
135
136std::ostream& operator<<(std::ostream& o, philosopher const &p) {
137 o << "< philosopher[" << reinterpret_cast<uintptr_t>(const_cast<philosopher *>(&p)) << "] " << p.name()
138 << ", my_count=" << p.my_count;
139 return o;
140}
141
142class think_node_body {
143 philosopher& my_philosopher;
144public:
145 think_node_body( philosopher &p ) : my_philosopher(p) { }
146 think_node_body( const think_node_body &other ) : my_philosopher(other.my_philosopher) { }
147 continue_msg operator()( continue_msg /*m*/) {
148 my_philosopher.think();
149 return continue_msg();
150 }
151};
152
153class eat_node_body {
154 philosopher &my_philosopher;
155public:
156 eat_node_body( philosopher &p) : my_philosopher(p) {}
157 eat_node_body( const eat_node_body &other ) : my_philosopher(other.my_philosopher) { }
158 continue_msg operator()(const join_output &in) {
159 my_philosopher.eat();
160 return continue_msg();
161 }
162};
163
164class forward_node_body {
165 philosopher &my_philosopher;
166public:
167 forward_node_body( philosopher &p) : my_philosopher(p) {}
168 forward_node_body( const forward_node_body &other ) : my_philosopher(other.my_philosopher) { }
169 void operator()( const continue_msg &in, forward_node_type::output_ports_type &out) {
170 my_philosopher.forward( in, out);
171 }
172};
173
174void philosopher::check() {
175 if ( my_count != 0 ) {
176 std::printf("ERROR: philosopher %s still had to run %d more times\n", name(), my_count);
177 std::exit(1);
178 }
179}
180
181void philosopher::forward( const continue_msg &/*in*/, forward_node_type::output_ports_type &out_ports ) {
182 if(my_count < 0) abort();
183 --my_count;
184 (void)tbb::flow::get<1>(out_ports).try_put(chopstick());
185 (void)tbb::flow::get<2>(out_ports).try_put(chopstick());
186 if (my_count > 0) {
187 (void)tbb::flow::get<0>(out_ports).try_put(continue_msg()); //start thinking again
188 } else {
189 if(verbose) {
190 tbb::spin_mutex::scoped_lock lock(my_mutex);
191 std::printf("%s has left the building\n", name());
192 }
193 }
194}
195
196void philosopher::eat() {
197 if(verbose) {
198 tbb::spin_mutex::scoped_lock lock(my_mutex);
199 std::printf("%s eating\n", name());
200 }
201 tbb::this_tbb_thread::sleep(eat_time);
202 if(verbose) {
203 tbb::spin_mutex::scoped_lock lock(my_mutex);
204 std::printf("%s done eating\n", name());
205 }
206}
207
208void philosopher::think() {
209 if(verbose) {
210 tbb::spin_mutex::scoped_lock lock(my_mutex);
211 std::printf("%s thinking\n", name());
212 }
213 tbb::this_tbb_thread::sleep(think_time);
214 if(verbose) {
215 tbb::spin_mutex::scoped_lock lock(my_mutex);
216 std::printf("%s done thinking\n", name());
217 }
218}
219
220typedef queue_node<continue_msg> thinking_done_type;
221
222int main(int argc, char *argv[]) {
223 try {
224 tbb::tick_count main_time = tbb::tick_count::now();
225 int num_threads;
226 int num_philosophers;
227
228 RunOptions options = ParseCommandLine(argc, argv);
229 num_philosophers = options.number_of_philosophers;
230 verbose = !options.silent;
231
232 for(num_threads = options.threads.first; num_threads <= options.threads.last; num_threads = options.threads.step(num_threads)) {
233
234 tbb::task_scheduler_init init(num_threads);
235
236 graph g;
237
238 if(verbose) std::cout << std::endl << num_philosophers << " philosophers with "
239 << num_threads << " threads" << std::endl << std::endl;
240 t0 = tbb::tick_count::now();
241
242 std::vector<queue_node<chopstick> > places(num_philosophers, queue_node<chopstick>(g));
243 std::vector<philosopher> philosophers;
244 philosophers.reserve(num_philosophers);
245 std::vector<think_node_type *> think_nodes;
246 think_nodes.reserve(num_philosophers);
247 std::vector<thinking_done_type> done_vector(num_philosophers, thinking_done_type(g));
248 std::vector<join_node_type> join_vector(num_philosophers,join_node_type(g));
249 std::vector<eat_node_type *> eat_nodes;
250 eat_nodes.reserve(num_philosophers);
251 std::vector<forward_node_type *> forward_nodes;
252 forward_nodes.reserve(num_philosophers);
253 for ( int i = 0; i < num_philosophers; ++i ) {
254 places[i].try_put(chopstick());
255 philosophers.push_back( philosopher( names[i] ) ); // allowed because of default generated assignment
256 if(verbose) {
257 tbb::spin_mutex::scoped_lock lock(my_mutex);
258 std::cout << "Built philosopher " << philosophers[i] << std::endl;
259 }
260 think_nodes.push_back(new think_node_type(g, unlimited, think_node_body(philosophers[i])));
261 eat_nodes.push_back( new eat_node_type(g, unlimited, eat_node_body(philosophers[i])));
262 forward_nodes.push_back( new forward_node_type(g, unlimited, forward_node_body(philosophers[i])));
263 }
264
265 // attach chopstick buffers and think function_nodes to joins
266 for(int i = 0; i < num_philosophers; ++i) {
267 make_edge( *think_nodes[i], done_vector[i] );
268 make_edge( done_vector[i], input_port<0>(join_vector[i]) );
269 make_edge( places[i], input_port<1>(join_vector[i]) ); // left chopstick
270 make_edge( places[(i+1) % num_philosophers], input_port<2>(join_vector[i]) ); // right chopstick
271 make_edge( join_vector[i], *eat_nodes[i] );
272 make_edge( *eat_nodes[i], *forward_nodes[i] );
273 make_edge( output_port<0>(*forward_nodes[i]), *think_nodes[i] );
274 make_edge( output_port<1>(*forward_nodes[i]), places[i] );
275 make_edge( output_port<2>(*forward_nodes[i]), places[(i+1) % num_philosophers] );
276 }
277
278 // start all the philosophers thinking
279 for(int i = 0; i < num_philosophers; ++i) think_nodes[i]->try_put(continue_msg());
280
281 g.wait_for_all();
282
283 tbb::tick_count t1 = tbb::tick_count::now();
284 if(verbose) std::cout << std::endl << num_philosophers << " philosophers with "
285 << num_threads << " threads have taken " << (t1-t0).seconds() << "seconds" << std::endl;
286
287 for ( int i = 0; i < num_philosophers; ++i ) philosophers[i].check();
288
289 for(int i = 0; i < num_philosophers; ++i) {
290 delete think_nodes[i];
291 delete eat_nodes[i];
292 delete forward_nodes[i];
293 }
294 }
295
296 utility::report_elapsed_time((tbb::tick_count::now() - main_time).seconds());
297 return 0;
298 } catch(std::exception& e) {
299 std::cerr<<"error occurred. error text is :\"" <<e.what()<<"\"\n";
300 return 1;
301 }
302}
303