1 | #if __APPLE__ || __FreeBSD__ |
2 | int main(int, char **) { return 0; } |
3 | #else |
4 | |
5 | #include <fcntl.h> |
6 | #include <port/unistd.h> |
7 | #include <stdlib.h> |
8 | #include <time.h> |
9 | #include <iostream> |
10 | #include <iomanip> |
11 | #include <vector> |
12 | #include <Poco/Exception.h> |
13 | #include <Common/Exception.h> |
14 | #include <Common/ThreadPool.h> |
15 | #include <Common/Stopwatch.h> |
16 | #include <IO/BufferWithOwnMemory.h> |
17 | #include <IO/ReadHelpers.h> |
18 | #include <stdlib.h> |
19 | #include <fcntl.h> |
20 | #include <stdlib.h> |
21 | #include <stdio.h> |
22 | #include <sys/stat.h> |
23 | #include <sys/types.h> |
24 | #include <IO/AIO.h> |
25 | #include <malloc.h> |
26 | #include <sys/syscall.h> |
27 | |
28 | |
29 | namespace DB |
30 | { |
31 | namespace ErrorCodes |
32 | { |
33 | extern const int CANNOT_OPEN_FILE; |
34 | extern const int CANNOT_CLOSE_FILE; |
35 | extern const int CANNOT_IO_SUBMIT; |
36 | extern const int CANNOT_IO_GETEVENTS; |
37 | } |
38 | } |
39 | |
40 | |
41 | enum Mode |
42 | { |
43 | MODE_READ = 1, |
44 | MODE_WRITE = 2, |
45 | }; |
46 | |
47 | |
48 | void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t buffers_count, size_t count) |
49 | { |
50 | using namespace DB; |
51 | |
52 | AIOContext ctx; |
53 | |
54 | std::vector<Memory<>> buffers(buffers_count); |
55 | for (size_t i = 0; i < buffers_count; ++i) |
56 | buffers[i] = Memory<>(block_size, sysconf(_SC_PAGESIZE)); |
57 | |
58 | drand48_data rand_data; |
59 | timespec times; |
60 | clock_gettime(CLOCK_THREAD_CPUTIME_ID, ×); |
61 | srand48_r(times.tv_nsec, &rand_data); |
62 | |
63 | size_t in_progress = 0; |
64 | size_t blocks_sent = 0; |
65 | std::vector<bool> buffer_used(buffers_count, false); |
66 | std::vector<iocb> iocbs(buffers_count); |
67 | std::vector<iocb*> query_cbs; |
68 | std::vector<io_event> events(buffers_count); |
69 | |
70 | while (blocks_sent < count || in_progress > 0) |
71 | { |
72 | /// Prepare queries. |
73 | query_cbs.clear(); |
74 | for (size_t i = 0; i < buffers_count; ++i) |
75 | { |
76 | if (blocks_sent >= count || in_progress >= buffers_count) |
77 | break; |
78 | |
79 | if (buffer_used[i]) |
80 | continue; |
81 | |
82 | buffer_used[i] = true; |
83 | ++blocks_sent; |
84 | ++in_progress; |
85 | |
86 | char * buf = buffers[i].data(); |
87 | |
88 | long rand_result1 = 0; |
89 | long rand_result2 = 0; |
90 | long rand_result3 = 0; |
91 | lrand48_r(&rand_data, &rand_result1); |
92 | lrand48_r(&rand_data, &rand_result2); |
93 | lrand48_r(&rand_data, &rand_result3); |
94 | |
95 | size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43); |
96 | size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size; |
97 | |
98 | iocb & cb = iocbs[i]; |
99 | memset(&cb, 0, sizeof(cb)); |
100 | cb.aio_buf = reinterpret_cast<UInt64>(buf); |
101 | cb.aio_fildes = fd; |
102 | cb.aio_nbytes = block_size; |
103 | cb.aio_offset = offset; |
104 | cb.aio_data = static_cast<UInt64>(i); |
105 | |
106 | if (mode == MODE_READ) |
107 | { |
108 | cb.aio_lio_opcode = IOCB_CMD_PREAD; |
109 | } |
110 | else |
111 | { |
112 | cb.aio_lio_opcode = IOCB_CMD_PWRITE; |
113 | } |
114 | |
115 | query_cbs.push_back(&cb); |
116 | } |
117 | |
118 | /// Send queries. |
119 | if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0) |
120 | throwFromErrno("io_submit failed" , ErrorCodes::CANNOT_IO_SUBMIT); |
121 | |
122 | /// Receive answers. If we have something else to send, then receive at least one answer (after that send them), otherwise wait all answers. |
123 | memset(&events[0], 0, buffers_count * sizeof(events[0])); |
124 | int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], nullptr); |
125 | if (evs < 0) |
126 | throwFromErrno("io_getevents failed" , ErrorCodes::CANNOT_IO_GETEVENTS); |
127 | |
128 | for (int i = 0; i < evs; ++i) |
129 | { |
130 | int b = static_cast<int>(events[i].data); |
131 | if (events[i].res != static_cast<int>(block_size)) |
132 | throw Poco::Exception("read/write error" ); |
133 | --in_progress; |
134 | buffer_used[b] = false; |
135 | } |
136 | } |
137 | } |
138 | |
139 | |
140 | int mainImpl(int argc, char ** argv) |
141 | { |
142 | using namespace DB; |
143 | |
144 | const char * file_name = 0; |
145 | int mode = MODE_READ; |
146 | UInt64 min_offset = 0; |
147 | UInt64 max_offset = 0; |
148 | UInt64 block_size = 0; |
149 | UInt64 buffers_count = 0; |
150 | UInt64 threads_count = 0; |
151 | UInt64 count = 0; |
152 | |
153 | if (argc != 9) |
154 | { |
155 | std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size threads buffers count" << std::endl; |
156 | return 1; |
157 | } |
158 | |
159 | file_name = argv[1]; |
160 | if (argv[2][0] == 'w') |
161 | mode = MODE_WRITE; |
162 | min_offset = parse<UInt64>(argv[3]); |
163 | max_offset = parse<UInt64>(argv[4]); |
164 | block_size = parse<UInt64>(argv[5]); |
165 | threads_count = parse<UInt64>(argv[6]); |
166 | buffers_count = parse<UInt64>(argv[7]); |
167 | count = parse<UInt64>(argv[8]); |
168 | |
169 | int fd = open(file_name, ((mode == MODE_READ) ? O_RDONLY : O_WRONLY) | O_DIRECT); |
170 | if (-1 == fd) |
171 | throwFromErrno("Cannot open file" , ErrorCodes::CANNOT_OPEN_FILE); |
172 | |
173 | ThreadPool pool(threads_count); |
174 | |
175 | Stopwatch watch; |
176 | |
177 | for (size_t i = 0; i < threads_count; ++i) |
178 | pool.scheduleOrThrowOnError(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count)); |
179 | pool.wait(); |
180 | |
181 | watch.stop(); |
182 | |
183 | if (0 != close(fd)) |
184 | throwFromErrno("Cannot close file" , ErrorCodes::CANNOT_CLOSE_FILE); |
185 | |
186 | std::cout << std::fixed << std::setprecision(2) |
187 | << "Done " << count << " * " << threads_count << " ops" ; |
188 | std::cout << " in " << watch.elapsedSeconds() << " sec." |
189 | << ", " << count * threads_count / watch.elapsedSeconds() << " ops/sec." |
190 | << ", " << count * threads_count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec." |
191 | << std::endl; |
192 | |
193 | return 0; |
194 | } |
195 | |
196 | |
197 | int main(int argc, char ** argv) |
198 | { |
199 | try |
200 | { |
201 | return mainImpl(argc, argv); |
202 | } |
203 | catch (const Poco::Exception & e) |
204 | { |
205 | std::cerr << e.what() << ", " << e.message() << std::endl; |
206 | return 1; |
207 | } |
208 | } |
209 | #endif |
210 | |