1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "Adaptor.hh"
20#include "orc/OrcFile.hh"
21#include "orc/Exceptions.hh"
22
23#include <errno.h>
24#include <fcntl.h>
25#include <stdio.h>
26#include <sys/stat.h>
27#include <string.h>
28
29#ifdef _MSC_VER
30#include <io.h>
31#define S_IRUSR _S_IREAD
32#define S_IWUSR _S_IWRITE
33#else
34#include <unistd.h>
35#define O_BINARY 0
36#endif
37
38namespace orc {
39
40 class FileInputStream : public InputStream {
41 private:
42 std::string filename;
43 int file;
44 uint64_t totalLength;
45
46 public:
47 FileInputStream(std::string _filename) {
48 filename = _filename;
49 file = open(filename.c_str(), O_BINARY | O_RDONLY);
50 if (file == -1) {
51 throw ParseError("Can't open " + filename);
52 }
53 struct stat fileStat;
54 if (fstat(file, &fileStat) == -1) {
55 throw ParseError("Can't stat " + filename);
56 }
57 totalLength = static_cast<uint64_t>(fileStat.st_size);
58 }
59
60 ~FileInputStream() override;
61
62 uint64_t getLength() const override {
63 return totalLength;
64 }
65
66 uint64_t getNaturalReadSize() const override {
67 return 128 * 1024;
68 }
69
70 void read(void* buf,
71 uint64_t length,
72 uint64_t offset) override {
73 if (!buf) {
74 throw ParseError("Buffer is null");
75 }
76 ssize_t bytesRead = pread(file, buf, length, static_cast<off_t>(offset));
77
78 if (bytesRead == -1) {
79 throw ParseError("Bad read of " + filename);
80 }
81 if (static_cast<uint64_t>(bytesRead) != length) {
82 throw ParseError("Short read of " + filename);
83 }
84 }
85
86 const std::string& getName() const override {
87 return filename;
88 }
89 };
90
91 FileInputStream::~FileInputStream() {
92 close(file);
93 }
94
95 std::unique_ptr<InputStream> readFile(const std::string& path) {
96#ifdef BUILD_LIBHDFSPP
97 if(strncmp (path.c_str(), "hdfs://", 7) == 0){
98 return orc::readHdfsFile(std::string(path));
99 } else {
100#endif
101 return orc::readLocalFile(std::string(path));
102#ifdef BUILD_LIBHDFSPP
103 }
104#endif
105 }
106
107 std::unique_ptr<InputStream> readLocalFile(const std::string& path) {
108 return std::unique_ptr<InputStream>(new FileInputStream(path));
109 }
110
111 OutputStream::~OutputStream() {
112 // PASS
113 };
114
115 class FileOutputStream : public OutputStream {
116 private:
117 std::string filename;
118 int file;
119 uint64_t bytesWritten;
120 bool closed;
121
122 public:
123 FileOutputStream(std::string _filename) {
124 bytesWritten = 0;
125 filename = _filename;
126 closed = false;
127 file = open(
128 filename.c_str(),
129 O_BINARY | O_CREAT | O_WRONLY | O_TRUNC,
130 S_IRUSR | S_IWUSR);
131 if (file == -1) {
132 throw ParseError("Can't open " + filename);
133 }
134 }
135
136 ~FileOutputStream() override;
137
138 uint64_t getLength() const override {
139 return bytesWritten;
140 }
141
142 uint64_t getNaturalWriteSize() const override {
143 return 128 * 1024;
144 }
145
146 void write(const void* buf, size_t length) override {
147 if (closed) {
148 throw std::logic_error("Cannot write to closed stream.");
149 }
150 ssize_t bytesWrite = ::write(file, buf, length);
151 if (bytesWrite == -1) {
152 throw ParseError("Bad write of " + filename);
153 }
154 if (static_cast<uint64_t>(bytesWrite) != length) {
155 throw ParseError("Short write of " + filename);
156 }
157 bytesWritten += static_cast<uint64_t>(bytesWrite);
158 }
159
160 const std::string& getName() const override {
161 return filename;
162 }
163
164 void close() override {
165 if (!closed) {
166 ::close(file);
167 closed = true;
168 }
169 }
170 };
171
172 FileOutputStream::~FileOutputStream() {
173 if (!closed) {
174 ::close(file);
175 closed = true;
176 }
177 }
178
179 std::unique_ptr<OutputStream> writeLocalFile(const std::string& path) {
180 return std::unique_ptr<OutputStream>(new FileOutputStream(path));
181 }
182}
183