1#pragma once
2
3#if defined(__linux__) || defined(__FreeBSD__)
4
5#include <condition_variable>
6#include <future>
7#include <mutex>
8#include <map>
9#include <IO/AIO.h>
10#include <Common/ThreadPool.h>
11
12
13namespace DB
14{
15
16class AIOContextPool : private boost::noncopyable
17{
18 static const auto max_concurrent_events = 128;
19 static const auto timeout_sec = 1;
20
21 AIOContext aio_context{max_concurrent_events};
22
23 using ID = size_t;
24 using BytesRead = ssize_t;
25
26 /// Autoincremental id used to identify completed requests
27 ID next_id{};
28 mutable std::mutex mutex;
29 mutable std::condition_variable have_resources;
30 std::map<ID, std::promise<BytesRead>> promises;
31
32 std::atomic<bool> cancelled{false};
33 ThreadFromGlobalPool io_completion_monitor{&AIOContextPool::doMonitor, this};
34
35 ~AIOContextPool();
36
37 void doMonitor();
38 void waitForCompletion();
39 int getCompletionEvents(io_event events[], const int max_events);
40 void fulfillPromises(const io_event events[], const int num_events);
41 void notifyProducers(const int num_producers) const;
42 void reportExceptionToAnyProducer();
43
44public:
45 static AIOContextPool & instance();
46
47 /// Request AIO read operation for iocb, returns a future with number of bytes read
48 std::future<BytesRead> post(struct iocb & iocb);
49};
50
51}
52
53#endif
54