1#if __APPLE__ || __FreeBSD__
2int main(int, char **) { return 0; }
3#else
4
5#include <fcntl.h>
6#include <port/unistd.h>
7#include <stdlib.h>
8#include <time.h>
9#include <iostream>
10#include <iomanip>
11#include <vector>
12#include <Poco/Exception.h>
13#include <Common/Exception.h>
14#include <Common/ThreadPool.h>
15#include <Common/Stopwatch.h>
16#include <IO/BufferWithOwnMemory.h>
17#include <IO/ReadHelpers.h>
18#include <stdlib.h>
19#include <fcntl.h>
20#include <stdlib.h>
21#include <stdio.h>
22#include <sys/stat.h>
23#include <sys/types.h>
24#include <IO/AIO.h>
25#include <malloc.h>
26#include <sys/syscall.h>
27
28
29namespace DB
30{
31 namespace ErrorCodes
32 {
33 extern const int CANNOT_OPEN_FILE;
34 extern const int CANNOT_CLOSE_FILE;
35 extern const int CANNOT_IO_SUBMIT;
36 extern const int CANNOT_IO_GETEVENTS;
37 }
38}
39
40
41enum Mode
42{
43 MODE_READ = 1,
44 MODE_WRITE = 2,
45};
46
47
48void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t buffers_count, size_t count)
49{
50 using namespace DB;
51
52 AIOContext ctx;
53
54 std::vector<Memory<>> buffers(buffers_count);
55 for (size_t i = 0; i < buffers_count; ++i)
56 buffers[i] = Memory<>(block_size, sysconf(_SC_PAGESIZE));
57
58 drand48_data rand_data;
59 timespec times;
60 clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times);
61 srand48_r(times.tv_nsec, &rand_data);
62
63 size_t in_progress = 0;
64 size_t blocks_sent = 0;
65 std::vector<bool> buffer_used(buffers_count, false);
66 std::vector<iocb> iocbs(buffers_count);
67 std::vector<iocb*> query_cbs;
68 std::vector<io_event> events(buffers_count);
69
70 while (blocks_sent < count || in_progress > 0)
71 {
72 /// Prepare queries.
73 query_cbs.clear();
74 for (size_t i = 0; i < buffers_count; ++i)
75 {
76 if (blocks_sent >= count || in_progress >= buffers_count)
77 break;
78
79 if (buffer_used[i])
80 continue;
81
82 buffer_used[i] = true;
83 ++blocks_sent;
84 ++in_progress;
85
86 char * buf = buffers[i].data();
87
88 long rand_result1 = 0;
89 long rand_result2 = 0;
90 long rand_result3 = 0;
91 lrand48_r(&rand_data, &rand_result1);
92 lrand48_r(&rand_data, &rand_result2);
93 lrand48_r(&rand_data, &rand_result3);
94
95 size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
96 size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
97
98 iocb & cb = iocbs[i];
99 memset(&cb, 0, sizeof(cb));
100 cb.aio_buf = reinterpret_cast<UInt64>(buf);
101 cb.aio_fildes = fd;
102 cb.aio_nbytes = block_size;
103 cb.aio_offset = offset;
104 cb.aio_data = static_cast<UInt64>(i);
105
106 if (mode == MODE_READ)
107 {
108 cb.aio_lio_opcode = IOCB_CMD_PREAD;
109 }
110 else
111 {
112 cb.aio_lio_opcode = IOCB_CMD_PWRITE;
113 }
114
115 query_cbs.push_back(&cb);
116 }
117
118 /// Send queries.
119 if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0)
120 throwFromErrno("io_submit failed", ErrorCodes::CANNOT_IO_SUBMIT);
121
122 /// Receive answers. If we have something else to send, then receive at least one answer (after that send them), otherwise wait all answers.
123 memset(&events[0], 0, buffers_count * sizeof(events[0]));
124 int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], nullptr);
125 if (evs < 0)
126 throwFromErrno("io_getevents failed", ErrorCodes::CANNOT_IO_GETEVENTS);
127
128 for (int i = 0; i < evs; ++i)
129 {
130 int b = static_cast<int>(events[i].data);
131 if (events[i].res != static_cast<int>(block_size))
132 throw Poco::Exception("read/write error");
133 --in_progress;
134 buffer_used[b] = false;
135 }
136 }
137}
138
139
140int mainImpl(int argc, char ** argv)
141{
142 using namespace DB;
143
144 const char * file_name = 0;
145 int mode = MODE_READ;
146 UInt64 min_offset = 0;
147 UInt64 max_offset = 0;
148 UInt64 block_size = 0;
149 UInt64 buffers_count = 0;
150 UInt64 threads_count = 0;
151 UInt64 count = 0;
152
153 if (argc != 9)
154 {
155 std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size threads buffers count" << std::endl;
156 return 1;
157 }
158
159 file_name = argv[1];
160 if (argv[2][0] == 'w')
161 mode = MODE_WRITE;
162 min_offset = parse<UInt64>(argv[3]);
163 max_offset = parse<UInt64>(argv[4]);
164 block_size = parse<UInt64>(argv[5]);
165 threads_count = parse<UInt64>(argv[6]);
166 buffers_count = parse<UInt64>(argv[7]);
167 count = parse<UInt64>(argv[8]);
168
169 int fd = open(file_name, ((mode == MODE_READ) ? O_RDONLY : O_WRONLY) | O_DIRECT);
170 if (-1 == fd)
171 throwFromErrno("Cannot open file", ErrorCodes::CANNOT_OPEN_FILE);
172
173 ThreadPool pool(threads_count);
174
175 Stopwatch watch;
176
177 for (size_t i = 0; i < threads_count; ++i)
178 pool.scheduleOrThrowOnError(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count));
179 pool.wait();
180
181 watch.stop();
182
183 if (0 != close(fd))
184 throwFromErrno("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
185
186 std::cout << std::fixed << std::setprecision(2)
187 << "Done " << count << " * " << threads_count << " ops";
188 std::cout << " in " << watch.elapsedSeconds() << " sec."
189 << ", " << count * threads_count / watch.elapsedSeconds() << " ops/sec."
190 << ", " << count * threads_count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec."
191 << std::endl;
192
193 return 0;
194}
195
196
197int main(int argc, char ** argv)
198{
199 try
200 {
201 return mainImpl(argc, argv);
202 }
203 catch (const Poco::Exception & e)
204 {
205 std::cerr << e.what() << ", " << e.message() << std::endl;
206 return 1;
207 }
208}
209#endif
210