1 | |
2 | // vim:sw=2:ai |
3 | |
4 | /* |
5 | * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. |
6 | * See COPYRIGHT.txt for details. |
7 | */ |
8 | |
9 | #include <my_global.h> |
10 | #include <vector> |
11 | #include <sys/socket.h> |
12 | #include <netinet/in.h> |
13 | #include <unistd.h> |
14 | #include <sys/resource.h> |
15 | |
16 | #include "hstcpsvr.hpp" |
17 | #include "hstcpsvr_worker.hpp" |
18 | #include "thread.hpp" |
19 | #include "fatal.hpp" |
20 | #include "auto_ptrcontainer.hpp" |
21 | |
22 | #define DBG(x) |
23 | |
24 | namespace dena { |
25 | |
26 | struct worker_throbj { |
27 | worker_throbj(const hstcpsvr_worker_arg& arg) |
28 | : worker(hstcpsvr_worker_i::create(arg)) { } |
29 | void operator ()() { |
30 | worker->run(); |
31 | } |
32 | hstcpsvr_worker_ptr worker; |
33 | }; |
34 | |
35 | struct hstcpsvr : public hstcpsvr_i, private noncopyable { |
36 | hstcpsvr(const config& c); |
37 | ~hstcpsvr(); |
38 | virtual std::string start_listen(); |
39 | private: |
40 | hstcpsvr_shared_c cshared; |
41 | volatile hstcpsvr_shared_v vshared; |
42 | typedef thread<worker_throbj> worker_thread_type; |
43 | typedef auto_ptrcontainer< std::vector<worker_thread_type *> > threads_type; |
44 | threads_type threads; |
45 | std::vector<unsigned int> thread_num_conns_vec; |
46 | private: |
47 | void stop_workers(); |
48 | }; |
49 | |
50 | namespace { |
51 | |
52 | void |
53 | check_nfile(size_t nfile) |
54 | { |
55 | struct rlimit rl; |
56 | const int r = getrlimit(RLIMIT_NOFILE, &rl); |
57 | if (r != 0) { |
58 | fatal_abort("check_nfile: getrlimit failed" ); |
59 | } |
60 | if (rl.rlim_cur < static_cast<rlim_t>(nfile + 1000)) { |
61 | fprintf(stderr, |
62 | "[Warning] handlersocket: open_files_limit is too small.\n" ); |
63 | } |
64 | } |
65 | |
66 | }; |
67 | |
68 | hstcpsvr::hstcpsvr(const config& c) |
69 | : cshared(), vshared() |
70 | { |
71 | vshared.shutdown = 0; |
72 | cshared.conf = c; /* copy */ |
73 | if (cshared.conf["port" ] == "" ) { |
74 | cshared.conf["port" ] = "9999" ; |
75 | } |
76 | cshared.num_threads = cshared.conf.get_int("num_threads" , 32); |
77 | cshared.sockargs.nonblocking = cshared.conf.get_int("nonblocking" , 1); |
78 | cshared.sockargs.use_epoll = cshared.conf.get_int("use_epoll" , 1); |
79 | if (cshared.sockargs.use_epoll) { |
80 | cshared.sockargs.nonblocking = 1; |
81 | } |
82 | cshared.readsize = cshared.conf.get_int("readsize" , 1); |
83 | cshared.nb_conn_per_thread = cshared.conf.get_int("conn_per_thread" , 1024); |
84 | cshared.for_write_flag = cshared.conf.get_int("for_write" , 0); |
85 | cshared.plain_secret = cshared.conf.get_str("plain_secret" , "" ); |
86 | cshared.require_auth = !cshared.plain_secret.empty(); |
87 | cshared.sockargs.set(cshared.conf); |
88 | cshared.dbptr = database_i::create(c); |
89 | check_nfile(cshared.num_threads * cshared.nb_conn_per_thread); |
90 | thread_num_conns_vec.resize(cshared.num_threads); |
91 | cshared.thread_num_conns = thread_num_conns_vec.empty() |
92 | ? 0 : &thread_num_conns_vec[0]; |
93 | } |
94 | |
95 | hstcpsvr::~hstcpsvr() |
96 | { |
97 | stop_workers(); |
98 | } |
99 | |
100 | std::string |
101 | hstcpsvr::start_listen() |
102 | { |
103 | std::string err; |
104 | if (threads.size() != 0) { |
105 | return "start_listen: already running" ; |
106 | } |
107 | if (socket_bind(cshared.listen_fd, cshared.sockargs, err) != 0) { |
108 | return "bind: " + err; |
109 | } |
110 | DENA_VERBOSE(20, fprintf(stderr, "bind done\n" )); |
111 | const size_t stack_size = std::max( |
112 | cshared.conf.get_int("stack_size" , 1 * 1024LL * 1024), 8 * 1024LL * 1024); |
113 | for (long i = 0; i < cshared.num_threads; ++i) { |
114 | hstcpsvr_worker_arg arg; |
115 | arg.cshared = &cshared; |
116 | arg.vshared = &vshared; |
117 | arg.worker_id = i; |
118 | std::auto_ptr< thread<worker_throbj> > thr( |
119 | new thread<worker_throbj>(arg, stack_size)); |
120 | threads.push_back_ptr(thr); |
121 | } |
122 | DENA_VERBOSE(20, fprintf(stderr, "threads created\n" )); |
123 | for (size_t i = 0; i < threads.size(); ++i) { |
124 | threads[i]->start(); |
125 | } |
126 | DENA_VERBOSE(20, fprintf(stderr, "threads started\n" )); |
127 | return std::string(); |
128 | } |
129 | |
130 | void |
131 | hstcpsvr::stop_workers() |
132 | { |
133 | vshared.shutdown = 1; |
134 | for (size_t i = 0; i < threads.size(); ++i) { |
135 | threads[i]->join(); |
136 | } |
137 | threads.clear(); |
138 | } |
139 | |
140 | hstcpsvr_ptr |
141 | hstcpsvr_i::create(const config& conf) |
142 | { |
143 | return hstcpsvr_ptr(new hstcpsvr(conf)); |
144 | } |
145 | |
146 | }; |
147 | |
148 | |