Libosmium  2.8.0
Fast and flexible C++ library for working with OpenStreetMap data
writer.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_WRITER_HPP
2 #define OSMIUM_IO_WRITER_HPP
3 
4 /*
5 
6 This file is part of Osmium (http://osmcode.org/libosmium).
7 
8 Copyright 2013-2016 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
36 #include <cassert>
37 #include <future>
38 #include <memory>
39 #include <stdexcept>
40 #include <string>
41 #include <thread>
42 #include <utility>
43 
45 #include <osmium/io/detail/output_format.hpp>
46 #include <osmium/io/detail/queue_util.hpp>
47 #include <osmium/io/detail/read_write.hpp>
48 #include <osmium/io/detail/write_thread.hpp>
49 #include <osmium/io/error.hpp>
50 #include <osmium/io/file.hpp>
51 #include <osmium/io/header.hpp>
53 #include <osmium/memory/buffer.hpp>
54 #include <osmium/thread/util.hpp>
55 #include <osmium/util/config.hpp>
56 
57 namespace osmium {
58 
59  namespace io {
60 
61  namespace detail {
62 
63  inline size_t get_output_queue_size() noexcept {
64  size_t n = osmium::config::get_max_queue_size("OUTPUT", 20);
65  return n > 2 ? n : 2;
66  }
67 
68  } // namespace detail
69 
93  class Writer {
94 
95  static constexpr size_t default_buffer_size = 10 * 1024 * 1024;
96 
98 
99  detail::future_string_queue_type m_output_queue;
100 
101  std::unique_ptr<osmium::io::detail::OutputFormat> m_output;
102 
104 
106 
107  std::future<bool> m_write_future;
108 
110 
111  enum class status {
112  okay = 0, // normal writing
113  error = 1, // some error occurred while writing
114  closed = 2 // close() called successfully
115  } m_status;
116 
117  // This function will run in a separate thread.
118  static void write_thread(detail::future_string_queue_type& output_queue,
119  std::unique_ptr<osmium::io::Compressor>&& compressor,
120  std::promise<bool>&& write_promise) {
121  detail::WriteThread write_thread{output_queue,
122  std::move(compressor),
123  std::move(write_promise)};
124  write_thread();
125  }
126 
128  if (buffer && buffer.committed() > 0) {
129  m_output->write_buffer(std::move(buffer));
130  }
131  }
132 
133  void do_flush() {
134  osmium::thread::check_for_exception(m_write_future);
135  if (m_buffer && m_buffer.committed() > 0) {
136  osmium::memory::Buffer buffer{m_buffer_size,
138  using std::swap;
139  swap(m_buffer, buffer);
140 
141  m_output->write_buffer(std::move(buffer));
142  }
143  }
144 
145  template <typename TFunction, typename... TArgs>
146  void ensure_cleanup(TFunction func, TArgs&&... args) {
147  if (m_status != status::okay) {
148  throw io_error("Can not write to writer when in status 'closed' or 'error'");
149  }
150 
151  try {
152  func(std::forward<TArgs>(args)...);
153  } catch (...) {
154  m_status = status::error;
155  detail::add_to_queue(m_output_queue, std::current_exception());
156  detail::add_end_of_data_to_queue(m_output_queue);
157  throw;
158  }
159  }
160 
161  struct options_type {
163  overwrite allow_overwrite = overwrite::no;
164  fsync sync = fsync::no;
165  };
166 
167  static void set_option(options_type& options, const osmium::io::Header& header) {
168  options.header = header;
169  }
170 
171  static void set_option(options_type& options, overwrite value) {
172  options.allow_overwrite = value;
173  }
174 
175  static void set_option(options_type& options, fsync value) {
176  options.sync = value;
177  }
178 
179  public:
180 
204  template <typename... TArgs>
205  explicit Writer(const osmium::io::File& file, TArgs&&... args) :
206  m_file(file.check()),
207  m_output_queue(detail::get_output_queue_size(), "raw_output"),
208  m_output(osmium::io::detail::OutputFormatFactory::instance().create_output(m_file, m_output_queue)),
209  m_buffer(),
210  m_buffer_size(default_buffer_size),
211  m_write_future(),
212  m_thread(),
213  m_status(status::okay) {
214  assert(!m_file.buffer()); // XXX can't handle pseudo-files
215 
216  options_type options;
217  (void)std::initializer_list<int>{
218  (set_option(options, args), 0)...
219  };
220 
221  std::unique_ptr<osmium::io::Compressor> compressor =
223  osmium::io::detail::open_for_writing(m_file.filename(), options.allow_overwrite),
224  options.sync);
225 
226  std::promise<bool> write_promise;
227  m_write_future = write_promise.get_future();
228  m_thread = osmium::thread::thread_handler{write_thread, std::ref(m_output_queue), std::move(compressor), std::move(write_promise)};
229 
230  ensure_cleanup([&](){
231  m_output->write_header(options.header);
232  });
233  }
234 
235  template <typename... TArgs>
236  explicit Writer(const std::string& filename, TArgs&&... args) :
237  Writer(osmium::io::File(filename), std::forward<TArgs>(args)...) {
238  }
239 
240  template <typename... TArgs>
241  explicit Writer(const char* filename, TArgs&&... args) :
242  Writer(osmium::io::File(filename), std::forward<TArgs>(args)...) {
243  }
244 
245  Writer(const Writer&) = delete;
246  Writer& operator=(const Writer&) = delete;
247 
248  Writer(Writer&&) = default;
249  Writer& operator=(Writer&&) = default;
250 
251  ~Writer() noexcept {
252  try {
253  close();
254  } catch (...) {
255  // Ignore any exceptions because destructor must not throw.
256  }
257  }
258 
262  size_t buffer_size() const noexcept {
263  return m_buffer_size;
264  }
265 
270  void set_buffer_size(size_t size) noexcept {
271  m_buffer_size = size;
272  }
273 
281  void flush() {
282  ensure_cleanup([&](){
283  do_flush();
284  });
285  }
286 
296  ensure_cleanup([&](){
297  do_flush();
298  do_write(std::move(buffer));
299  });
300  }
301 
309  void operator()(const osmium::memory::Item& item) {
310  ensure_cleanup([&](){
311  if (!m_buffer) {
312  m_buffer = osmium::memory::Buffer{m_buffer_size,
314  }
315  try {
316  m_buffer.push_back(item);
317  } catch (osmium::buffer_is_full&) {
318  do_flush();
319  m_buffer.push_back(item);
320  }
321  });
322  }
323 
333  void close() {
334  if (m_status == status::okay) {
335  ensure_cleanup([&](){
336  do_write(std::move(m_buffer));
337  m_output->write_end();
338  m_status = status::closed;
339  detail::add_end_of_data_to_queue(m_output_queue);
340  });
341  }
342 
343  if (m_write_future.valid()) {
344  m_write_future.get();
345  }
346  }
347 
348  }; // class Writer
349 
350  } // namespace io
351 
352 } // namespace osmium
353 
354 #endif // OSMIUM_IO_WRITER_HPP
fsync sync
Definition: writer.hpp:164
~Writer() noexcept
Definition: writer.hpp:251
void ensure_cleanup(TFunction func, TArgs &&...args)
Definition: writer.hpp:146
Definition: writer.hpp:161
static CompressionFactory & instance()
Definition: compression.hpp:151
void do_write(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:127
void do_flush()
Definition: writer.hpp:133
Definition: reader_iterator.hpp:39
void swap(Buffer &lhs, Buffer &rhs)
Definition: buffer.hpp:731
osmium::thread::thread_handler m_thread
Definition: writer.hpp:109
osmium::io::Header header
Definition: writer.hpp:162
osmium::memory::Buffer m_buffer
Definition: writer.hpp:103
std::unique_ptr< osmium::io::detail::OutputFormat > m_output
Definition: writer.hpp:101
void set_buffer_size(size_t size) noexcept
Definition: writer.hpp:270
Definition: file.hpp:74
Definition: item.hpp:97
static void write_thread(detail::future_string_queue_type &output_queue, std::unique_ptr< osmium::io::Compressor > &&compressor, std::promise< bool > &&write_promise)
Definition: writer.hpp:118
void operator()(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:295
Namespace for everything in the Osmium library.
Definition: assembler.hpp:66
status
Definition: writer.hpp:111
Definition: attr.hpp:298
size_t buffer_size() const noexcept
Definition: writer.hpp:262
Writer(const std::string &filename, TArgs &&...args)
Definition: writer.hpp:236
fsync
Definition: writer_options.hpp:51
static void set_option(options_type &options, fsync value)
Definition: writer.hpp:175
void push_back(const osmium::memory::Item &item)
Definition: buffer.hpp:488
Definition: error.hpp:44
size_t m_buffer_size
Definition: writer.hpp:105
void close()
Definition: writer.hpp:333
void flush()
Definition: writer.hpp:281
osmium::io::File m_file
Definition: writer.hpp:97
std::unique_ptr< osmium::io::Compressor > create_compressor(osmium::io::file_compression compression, TArgs &&...args)
Definition: compression.hpp:171
size_t committed() const noexcept
Definition: buffer.hpp:241
size_t get_max_queue_size(const char *queue_name, size_t default_value) noexcept
Definition: config.hpp:69
Definition: buffer.hpp:97
Definition: buffer.hpp:58
const char * buffer() const noexcept
Definition: file.hpp:158
static void set_option(options_type &options, overwrite value)
Definition: writer.hpp:171
Definition: writer.hpp:93
std::future< bool > m_write_future
Definition: writer.hpp:107
void check_for_exception(std::future< T > &future)
Definition: util.hpp:53
Writer(const osmium::io::File &file, TArgs &&...args)
Definition: writer.hpp:205
Definition: header.hpp:49
overwrite allow_overwrite
Definition: writer.hpp:163
static void set_option(options_type &options, const osmium::io::Header &header)
Definition: writer.hpp:167
detail::future_string_queue_type m_output_queue
Definition: writer.hpp:99
file_compression compression() const noexcept
Definition: file.hpp:291
File & filename(const std::string &filename)
Definition: file.hpp:309
Writer(const char *filename, TArgs &&...args)
Definition: writer.hpp:241
void operator()(const osmium::memory::Item &item)
Definition: writer.hpp:309
Definition: util.hpp:83
overwrite
Definition: writer_options.hpp:43