1#include <Storages/MergeTree/MergeTreePartInfo.h>
2#include <Storages/MergeTree/MergeTreeDataPart.h>
3#include <DataTypes/DataTypeDate.h>
4#include <IO/ReadBufferFromFile.h>
5#include <IO/WriteBufferFromFile.h>
6#include <IO/HashingWriteBuffer.h>
7#include <Common/Exception.h>
8#include <Common/localBackup.h>
9#include <Parsers/ASTFunction.h>
10#include <Parsers/ASTIdentifier.h>
11
12#include <boost/program_options.hpp>
13#include <Poco/Path.h>
14#include <Poco/File.h>
15
16#include <iostream>
17
18namespace DB
19{
20
21namespace ErrorCodes
22{
23 extern const int DIRECTORY_ALREADY_EXISTS;
24 extern const int BAD_DATA_PART_NAME;
25 extern const int NO_FILE_IN_DATA_PART;
26}
27
28void run(String part_path, String date_column, String dest_path)
29{
30 auto old_part_path = Poco::Path::forDirectory(part_path);
31 String old_part_name = old_part_path.directory(old_part_path.depth() - 1);
32 String old_part_path_str = old_part_path.toString();
33
34 auto part_info = MergeTreePartInfo::fromPartName(old_part_name, MergeTreeDataFormatVersion(0));
35 String new_part_name = part_info.getPartName();
36
37 auto new_part_path = Poco::Path::forDirectory(dest_path);
38 new_part_path.pushDirectory(new_part_name);
39 if (Poco::File(new_part_path).exists())
40 throw Exception("Destination part directory `" + new_part_path.toString() + "` already exists",
41 ErrorCodes::DIRECTORY_ALREADY_EXISTS);
42
43 DayNum min_date;
44 DayNum max_date;
45 MergeTreePartInfo::parseMinMaxDatesFromPartName(old_part_name, min_date, max_date);
46
47 UInt32 yyyymm = DateLUT::instance().toNumYYYYMM(min_date);
48 if (yyyymm != DateLUT::instance().toNumYYYYMM(max_date))
49 throw Exception("Part " + old_part_name + " spans different months",
50 ErrorCodes::BAD_DATA_PART_NAME);
51
52 ReadBufferFromFile checksums_in(old_part_path_str + "checksums.txt", 4096);
53 MergeTreeDataPartChecksums checksums;
54 checksums.read(checksums_in);
55
56 auto date_col_checksum_it = checksums.files.find(date_column + ".bin");
57 if (date_col_checksum_it == checksums.files.end())
58 throw Exception("Couldn't find checksum for the date column .bin file `" + date_column + ".bin`",
59 ErrorCodes::NO_FILE_IN_DATA_PART);
60
61 UInt64 rows = date_col_checksum_it->second.uncompressed_size / DataTypeDate().getSizeOfValueInMemory();
62
63 auto new_tmp_part_path = Poco::Path::forDirectory(dest_path);
64 new_tmp_part_path.pushDirectory("tmp_convert_" + new_part_name);
65 String new_tmp_part_path_str = new_tmp_part_path.toString();
66 try
67 {
68 Poco::File(new_tmp_part_path).remove(/* recursive = */ true);
69 }
70 catch (const Poco::FileNotFoundException &)
71 {
72 /// If the file is already deleted, do nothing.
73 }
74 localBackup(old_part_path, new_tmp_part_path, {});
75
76 WriteBufferFromFile count_out(new_tmp_part_path_str + "count.txt", 4096);
77 HashingWriteBuffer count_out_hashing(count_out);
78 writeIntText(rows, count_out_hashing);
79 count_out_hashing.next();
80 checksums.files["count.txt"].file_size = count_out_hashing.count();
81 checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
82
83 MergeTreeDataPart::MinMaxIndex minmax_idx(min_date, max_date);
84 Names minmax_idx_columns = {date_column};
85 DataTypes minmax_idx_column_types = {std::make_shared<DataTypeDate>()};
86 minmax_idx.store(minmax_idx_columns, minmax_idx_column_types, new_tmp_part_path_str, checksums);
87
88 Block partition_key_sample{{nullptr, std::make_shared<DataTypeUInt32>(), makeASTFunction("toYYYYMM", std::make_shared<ASTIdentifier>(date_column))->getColumnName()}};
89
90 MergeTreePartition partition(yyyymm);
91 partition.store(partition_key_sample, new_tmp_part_path_str, checksums);
92 String partition_id = partition.getID(partition_key_sample);
93
94 Poco::File(new_tmp_part_path_str + "checksums.txt").setWriteable();
95 WriteBufferFromFile checksums_out(new_tmp_part_path_str + "checksums.txt", 4096);
96 checksums.write(checksums_out);
97
98 Poco::File(new_tmp_part_path).renameTo(new_part_path.toString());
99}
100
101}
102
103int main(int argc, char ** argv)
104try
105{
106 boost::program_options::options_description desc("Allowed options");
107 desc.add_options()
108 ("help,h", "produce help message")
109 ("part", boost::program_options::value<std::string>()->required(),
110 "part directory to convert")
111 ("date-column", boost::program_options::value<std::string>()->required(),
112 "name of the date column")
113 ("to", boost::program_options::value<std::string>()->required(),
114 "destination directory")
115 ;
116
117 boost::program_options::variables_map options;
118 boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
119
120 if (options.count("help") || options.size() < 3)
121 {
122 std::cout
123 << "Convert a MergeTree part from the old-style month-partitioned table "
124 << "(e.g. 20140317_20140323_2_2_0) to the format suitable for ATTACH'ing to a custom-partitioned "
125 << "table (201403_2_2_0)." << std::endl << std::endl;
126 std::cout << desc << std::endl;
127 return 1;
128 }
129
130 auto part_path = options.at("part").as<DB::String>();
131 auto date_column = options.at("date-column").as<DB::String>();
132 auto dest_path = options.at("to").as<DB::String>();
133
134 DB::run(part_path, date_column, dest_path);
135
136 return 0;
137}
138catch (...)
139{
140 std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
141 throw;
142}
143