1
2// vim:sw=2:ai
3
4/*
5 * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
6 * See COPYRIGHT.txt for details.
7 */
8
9#include <my_global.h>
10#include <vector>
11#include <sys/socket.h>
12#include <netinet/in.h>
13#include <unistd.h>
14#include <sys/resource.h>
15
16#include "hstcpsvr.hpp"
17#include "hstcpsvr_worker.hpp"
18#include "thread.hpp"
19#include "fatal.hpp"
20#include "auto_ptrcontainer.hpp"
21
22#define DBG(x)
23
24namespace dena {
25
26struct worker_throbj {
27 worker_throbj(const hstcpsvr_worker_arg& arg)
28 : worker(hstcpsvr_worker_i::create(arg)) { }
29 void operator ()() {
30 worker->run();
31 }
32 hstcpsvr_worker_ptr worker;
33};
34
35struct hstcpsvr : public hstcpsvr_i, private noncopyable {
36 hstcpsvr(const config& c);
37 ~hstcpsvr();
38 virtual std::string start_listen();
39 private:
40 hstcpsvr_shared_c cshared;
41 volatile hstcpsvr_shared_v vshared;
42 typedef thread<worker_throbj> worker_thread_type;
43 typedef auto_ptrcontainer< std::vector<worker_thread_type *> > threads_type;
44 threads_type threads;
45 std::vector<unsigned int> thread_num_conns_vec;
46 private:
47 void stop_workers();
48};
49
50namespace {
51
52void
53check_nfile(size_t nfile)
54{
55 struct rlimit rl;
56 const int r = getrlimit(RLIMIT_NOFILE, &rl);
57 if (r != 0) {
58 fatal_abort("check_nfile: getrlimit failed");
59 }
60 if (rl.rlim_cur < static_cast<rlim_t>(nfile + 1000)) {
61 fprintf(stderr,
62 "[Warning] handlersocket: open_files_limit is too small.\n");
63 }
64}
65
66};
67
68hstcpsvr::hstcpsvr(const config& c)
69 : cshared(), vshared()
70{
71 vshared.shutdown = 0;
72 cshared.conf = c; /* copy */
73 if (cshared.conf["port"] == "") {
74 cshared.conf["port"] = "9999";
75 }
76 cshared.num_threads = cshared.conf.get_int("num_threads", 32);
77 cshared.sockargs.nonblocking = cshared.conf.get_int("nonblocking", 1);
78 cshared.sockargs.use_epoll = cshared.conf.get_int("use_epoll", 1);
79 if (cshared.sockargs.use_epoll) {
80 cshared.sockargs.nonblocking = 1;
81 }
82 cshared.readsize = cshared.conf.get_int("readsize", 1);
83 cshared.nb_conn_per_thread = cshared.conf.get_int("conn_per_thread", 1024);
84 cshared.for_write_flag = cshared.conf.get_int("for_write", 0);
85 cshared.plain_secret = cshared.conf.get_str("plain_secret", "");
86 cshared.require_auth = !cshared.plain_secret.empty();
87 cshared.sockargs.set(cshared.conf);
88 cshared.dbptr = database_i::create(c);
89 check_nfile(cshared.num_threads * cshared.nb_conn_per_thread);
90 thread_num_conns_vec.resize(cshared.num_threads);
91 cshared.thread_num_conns = thread_num_conns_vec.empty()
92 ? 0 : &thread_num_conns_vec[0];
93}
94
95hstcpsvr::~hstcpsvr()
96{
97 stop_workers();
98}
99
100std::string
101hstcpsvr::start_listen()
102{
103 std::string err;
104 if (threads.size() != 0) {
105 return "start_listen: already running";
106 }
107 if (socket_bind(cshared.listen_fd, cshared.sockargs, err) != 0) {
108 return "bind: " + err;
109 }
110 DENA_VERBOSE(20, fprintf(stderr, "bind done\n"));
111 const size_t stack_size = std::max(
112 cshared.conf.get_int("stack_size", 1 * 1024LL * 1024), 8 * 1024LL * 1024);
113 for (long i = 0; i < cshared.num_threads; ++i) {
114 hstcpsvr_worker_arg arg;
115 arg.cshared = &cshared;
116 arg.vshared = &vshared;
117 arg.worker_id = i;
118 std::auto_ptr< thread<worker_throbj> > thr(
119 new thread<worker_throbj>(arg, stack_size));
120 threads.push_back_ptr(thr);
121 }
122 DENA_VERBOSE(20, fprintf(stderr, "threads created\n"));
123 for (size_t i = 0; i < threads.size(); ++i) {
124 threads[i]->start();
125 }
126 DENA_VERBOSE(20, fprintf(stderr, "threads started\n"));
127 return std::string();
128}
129
130void
131hstcpsvr::stop_workers()
132{
133 vshared.shutdown = 1;
134 for (size_t i = 0; i < threads.size(); ++i) {
135 threads[i]->join();
136 }
137 threads.clear();
138}
139
140hstcpsvr_ptr
141hstcpsvr_i::create(const config& conf)
142{
143 return hstcpsvr_ptr(new hstcpsvr(conf));
144}
145
146};
147
148