1#include <Common/ThreadPool.h>
2#include <IO/ReadBufferFromFile.h>
3#include <IO/WriteBufferFromFile.h>
4#include <IO/copyData.h>
5#include <Common/TaskStatsInfoGetter.h>
6#include <Poco/File.h>
7#include <Common/Stopwatch.h>
8#include <IO/WriteBufferFromString.h>
9#include <linux/taskstats.h>
10#include <sys/time.h>
11#include <sys/resource.h>
12#include <pthread.h>
13
14
15std::mutex mutex;
16
17
18static std::ostream & operator << (std::ostream & stream, const ::taskstats & stat)
19{
20#define PRINT(field) (stream << #field << " " << stat.field)
21
22 PRINT(ac_pid) << ", ";
23
24 PRINT(read_bytes) << ", ";
25 PRINT(write_bytes) << ", ";
26
27 PRINT(read_char) << ", ";
28 PRINT(write_char) << ", ";
29
30 PRINT(swapin_delay_total) << ", ";
31 PRINT(blkio_delay_total) << ", ";
32 PRINT(cpu_delay_total) << ", ";
33
34 PRINT(ac_pid) << ", ";
35
36 PRINT(ac_utime) << ", ";
37 PRINT(ac_stime) << ", ";
38
39#undef PRINT
40
41 return stream;
42}
43
44using namespace DB;
45
46
47static void do_io(size_t id)
48{
49 ::taskstats stat;
50 int tid = TaskStatsInfoGetter::getCurrentTID();
51 TaskStatsInfoGetter get_info;
52
53 get_info.getStat(stat, tid);
54 {
55 std::lock_guard lock(mutex);
56 std::cerr << "#" << id << ", tid " << tid << ", intitial\n" << stat << "\n";
57 }
58
59 size_t copy_size = 1048576 * (1 + id);
60 std::string path_dst = "test_out_" + std::to_string(id);
61
62 {
63 ReadBufferFromFile rb("/dev/urandom");
64 WriteBufferFromFile wb(path_dst, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0666, nullptr, 4096);
65 copyData(rb, wb, copy_size);
66 wb.close();
67 }
68
69 get_info.getStat(stat, tid);
70 {
71 std::lock_guard lock(mutex);
72 std::cerr << "#" << id << ", tid " << tid << ", step1\n" << stat << "\n";
73 }
74
75 {
76 ReadBufferFromFile rb(path_dst);
77 WriteBufferFromOwnString wb;
78 copyData(rb, wb, copy_size);
79 }
80
81 get_info.getStat(stat, tid);
82 {
83 std::lock_guard lock(mutex);
84 std::cerr << "#" << id << ", tid " << tid << ", step2\n" << stat << "\n";
85 }
86
87 {
88 ReadBufferFromFile rb(path_dst);
89 WriteBufferFromOwnString wb;
90 copyData(rb, wb, copy_size);
91 }
92
93 get_info.getStat(stat, tid);
94 {
95 std::lock_guard lock(mutex);
96 std::cerr << "#" << id << ", tid " << tid << ", step3\n" << stat << "\n";
97 }
98
99 Poco::File(path_dst).remove(false);
100}
101
102static void test_perf()
103{
104
105 ::taskstats stat;
106 int tid = TaskStatsInfoGetter::getCurrentTID();
107 TaskStatsInfoGetter get_info;
108
109 rusage rusage;
110
111 constexpr size_t num_samples = 1000000;
112 {
113 Stopwatch watch;
114 for (size_t i = 0; i < num_samples; ++i)
115 getrusage(RUSAGE_THREAD, &rusage);
116
117 auto ms = watch.elapsedMilliseconds();
118 if (ms > 0)
119 std::cerr << "RUsage: " << double(ms) / num_samples << " ms per call, " << 1000 * num_samples / ms << " calls per second\n";
120 }
121
122 {
123 Stopwatch watch;
124 for (size_t i = 0; i < num_samples; ++i)
125 get_info.getStat(stat, tid);
126
127 auto ms = watch.elapsedMilliseconds();
128 if (ms > 0)
129 std::cerr << "Netlink: " << double(ms) / num_samples << " ms per call, " << 1000 * num_samples / ms << " calls per second\n";
130 }
131
132 std::cerr << stat << "\n";
133}
134
135int main()
136try
137{
138 std::cerr << "pid " << getpid() << "\n";
139
140 size_t num_threads = 2;
141 ThreadPool pool(num_threads);
142 for (size_t i = 0; i < num_threads; ++i)
143 pool.scheduleOrThrowOnError([i]() { do_io(i); });
144 pool.wait();
145
146 test_perf();
147 return 0;
148}
149catch (...)
150{
151 std::cerr << getCurrentExceptionMessage(true);
152 return -1;
153}
154
155