1#include <Common/Exception.h>
2
3#include <DataStreams/IBlockInputStream.h>
4
5#include <Storages/StorageMemory.h>
6#include <Storages/StorageFactory.h>
7
8#include <IO/WriteHelpers.h>
9
10
11namespace DB
12{
13
14namespace ErrorCodes
15{
16 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
17}
18
19
20class MemoryBlockInputStream : public IBlockInputStream
21{
22public:
23 MemoryBlockInputStream(const Names & column_names_, BlocksList::iterator begin_, BlocksList::iterator end_, const StorageMemory & storage_)
24 : column_names(column_names_), begin(begin_), end(end_), it(begin), storage(storage_) {}
25
26 String getName() const override { return "Memory"; }
27
28 Block getHeader() const override { return storage.getSampleBlockForColumns(column_names); }
29
30protected:
31 Block readImpl() override
32 {
33 if (it == end)
34 {
35 return Block();
36 }
37 else
38 {
39 Block src = *it;
40 Block res;
41
42 /// Add only required columns to `res`.
43 for (size_t i = 0, size = column_names.size(); i < size; ++i)
44 res.insert(src.getByName(column_names[i]));
45
46 ++it;
47 return res;
48 }
49 }
50private:
51 Names column_names;
52 BlocksList::iterator begin;
53 BlocksList::iterator end;
54 BlocksList::iterator it;
55 const StorageMemory & storage;
56};
57
58
59class MemoryBlockOutputStream : public IBlockOutputStream
60{
61public:
62 explicit MemoryBlockOutputStream(StorageMemory & storage_) : storage(storage_) {}
63
64 Block getHeader() const override { return storage.getSampleBlock(); }
65
66 void write(const Block & block) override
67 {
68 storage.check(block, true);
69 std::lock_guard lock(storage.mutex);
70 storage.data.push_back(block);
71 }
72private:
73 StorageMemory & storage;
74};
75
76
77StorageMemory::StorageMemory(String database_name_, String table_name_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
78 : database_name(std::move(database_name_)), table_name(std::move(table_name_))
79{
80 setColumns(std::move(columns_description_));
81 setConstraints(std::move(constraints_));
82}
83
84
85BlockInputStreams StorageMemory::read(
86 const Names & column_names,
87 const SelectQueryInfo & /*query_info*/,
88 const Context & /*context*/,
89 QueryProcessingStage::Enum /*processed_stage*/,
90 size_t /*max_block_size*/,
91 unsigned num_streams)
92{
93 check(column_names);
94
95 std::lock_guard lock(mutex);
96
97 size_t size = data.size();
98
99 if (num_streams > size)
100 num_streams = size;
101
102 BlockInputStreams res;
103
104 for (size_t stream = 0; stream < num_streams; ++stream)
105 {
106 BlocksList::iterator begin = data.begin();
107 BlocksList::iterator end = data.begin();
108
109 std::advance(begin, stream * size / num_streams);
110 std::advance(end, (stream + 1) * size / num_streams);
111
112 res.push_back(std::make_shared<MemoryBlockInputStream>(column_names, begin, end, *this));
113 }
114
115 return res;
116}
117
118
119BlockOutputStreamPtr StorageMemory::write(
120 const ASTPtr & /*query*/, const Context & /*context*/)
121{
122 return std::make_shared<MemoryBlockOutputStream>(*this);
123}
124
125
126void StorageMemory::drop(TableStructureWriteLockHolder &)
127{
128 std::lock_guard lock(mutex);
129 data.clear();
130}
131
132void StorageMemory::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
133{
134 std::lock_guard lock(mutex);
135 data.clear();
136}
137
138
139void registerStorageMemory(StorageFactory & factory)
140{
141 factory.registerStorage("Memory", [](const StorageFactory::Arguments & args)
142 {
143 if (!args.engine_args.empty())
144 throw Exception(
145 "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
146 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
147
148 return StorageMemory::create(args.database_name, args.table_name, args.columns, args.constraints);
149 });
150}
151
152}
153