1#include <iostream>
2
3#include <IO/WriteBufferFromOStream.h>
4#include <Storages/StorageLog.h>
5#include <Formats/FormatFactory.h>
6#include <DataStreams/LimitBlockInputStream.h>
7#include <DataStreams/IBlockOutputStream.h>
8#include <DataStreams/copyData.h>
9#include <DataTypes/DataTypesNumber.h>
10#include <Columns/ColumnsNumber.h>
11#include <Interpreters/Context.h>
12#include <Common/typeid_cast.h>
13
14
15int main(int, char **)
16try
17{
18 using namespace DB;
19
20 const size_t rows = 10000000;
21
22 /// create table with a pair of columns
23
24 NamesAndTypesList names_and_types;
25 names_and_types.emplace_back("a", std::make_shared<DataTypeUInt64>());
26 names_and_types.emplace_back("b", std::make_shared<DataTypeUInt8>());
27
28 auto context = Context::createGlobal();
29 context.makeGlobalContext();
30 context.setPath("./");
31
32 StoragePtr table = StorageLog::create("./", "test", "test", ColumnsDescription{names_and_types}, ConstraintsDescription{}, 1048576, context);
33 table->startup();
34
35 /// write into it
36 {
37 Block block;
38
39 {
40 ColumnWithTypeAndName column;
41 column.name = "a";
42 column.type = table->getColumn("a").type;
43 auto col = column.type->createColumn();
44 ColumnUInt64::Container & vec = typeid_cast<ColumnUInt64 &>(*col).getData();
45
46 vec.resize(rows);
47 for (size_t i = 0; i < rows; ++i)
48 vec[i] = i;
49
50 column.column = std::move(col);
51 block.insert(column);
52 }
53
54 {
55 ColumnWithTypeAndName column;
56 column.name = "b";
57 column.type = table->getColumn("b").type;
58 auto col = column.type->createColumn();
59 ColumnUInt8::Container & vec = typeid_cast<ColumnUInt8 &>(*col).getData();
60
61 vec.resize(rows);
62 for (size_t i = 0; i < rows; ++i)
63 vec[i] = i * 2;
64
65 column.column = std::move(col);
66 block.insert(column);
67 }
68
69 BlockOutputStreamPtr out = table->write({}, context);
70 out->write(block);
71 }
72
73 /// read from it
74 {
75 Names column_names;
76 column_names.push_back("a");
77 column_names.push_back("b");
78
79 QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
80
81 BlockInputStreamPtr in = table->read(column_names, {}, context, stage, 8192, 1)[0];
82
83 Block sample;
84 {
85 ColumnWithTypeAndName col;
86 col.type = std::make_shared<DataTypeUInt64>();
87 sample.insert(std::move(col));
88 }
89 {
90 ColumnWithTypeAndName col;
91 col.type = std::make_shared<DataTypeUInt8>();
92 sample.insert(std::move(col));
93 }
94
95 WriteBufferFromOStream out_buf(std::cout);
96
97 LimitBlockInputStream in_limit(in, 10, 0);
98 BlockOutputStreamPtr output = FormatFactory::instance().getOutput("TabSeparated", out_buf, sample, context);
99
100 copyData(in_limit, *output);
101 }
102
103 return 0;
104}
105catch (const DB::Exception & e)
106{
107 std::cerr << e.what() << ", " << e.displayText() << std::endl;
108 return 1;
109}
110