2 * Copyright (C) 2016 Simon Fels <morphis@gravedo.de>
4 * This program is free software: you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 3, as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranties of
10 * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11 * PURPOSE. See the GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program. If not, see <http://www.gnu.org/licenses/>.
18 #include "anbox/graphics/buffered_io_stream.h"
19 #include "anbox/logger.h"
23 BufferedIOStream::BufferedIOStream(
24 const std::shared_ptr<anbox::network::SocketMessenger> &messenger,
26 : IOStream(buffer_size),
27 messenger_(messenger),
30 worker_thread_(&BufferedIOStream::thread_main, this) {
31 write_buffer_.resize_noinit(buffer_size);
34 BufferedIOStream::~BufferedIOStream() {
36 if (worker_thread_.joinable()) worker_thread_.join();
39 void *BufferedIOStream::allocBuffer(size_t min_size) {
40 std::unique_lock<std::mutex> l(out_lock_);
41 if (write_buffer_.size() < min_size) write_buffer_.resize_noinit(min_size);
42 return write_buffer_.data();
45 size_t BufferedIOStream::commitBuffer(size_t size) {
46 std::unique_lock<std::mutex> l(out_lock_);
47 assert(size <= write_buffer_.size());
48 if (write_buffer_.isAllocated()) {
49 write_buffer_.resize(size);
50 out_queue_.push_locked(std::move(write_buffer_), l);
52 out_queue_.push_locked(
53 Buffer{write_buffer_.data(), write_buffer_.data() + size}, l);
58 const unsigned char *BufferedIOStream::read(void *buf, size_t *inout_len) {
59 std::unique_lock<std::mutex> l(lock_);
60 size_t wanted = *inout_len;
62 auto dst = static_cast<uint8_t *>(buf);
63 while (count < wanted) {
64 if (read_buffer_left_ > 0) {
65 size_t avail = std::min<size_t>(wanted - count, read_buffer_left_);
67 read_buffer_.data() + (read_buffer_.size() - read_buffer_left_),
70 read_buffer_left_ -= avail;
74 bool blocking = (count == 0);
77 result = in_queue_.pop_locked(&read_buffer_, l);
79 result = in_queue_.try_pop_locked(&read_buffer_);
82 read_buffer_left_ = read_buffer_.size();
88 // If we end up here something went wrong and we couldn't read
94 return static_cast<const unsigned char *>(buf);
97 void BufferedIOStream::forceStop() {
98 std::lock_guard<std::mutex> l(lock_);
99 in_queue_.close_locked();
100 out_queue_.close_locked();
103 void BufferedIOStream::post_data(Buffer &&data) {
104 std::unique_lock<std::mutex> l(lock_);
105 in_queue_.push_locked(std::move(data), l);
108 bool BufferedIOStream::needs_data() {
109 std::unique_lock<std::mutex> l(lock_);
110 return !in_queue_.can_pop_locked();
113 void BufferedIOStream::thread_main() {
115 std::unique_lock<std::mutex> l(out_lock_);
118 const auto result = out_queue_.pop_locked(&buffer, l);
119 if (result != 0 && result != -EAGAIN) break;
121 auto bytes_left = buffer.size();
122 while (bytes_left > 0) {
123 const auto written = messenger_->send_raw(
124 buffer.data() + (buffer.size() - bytes_left), bytes_left);
126 if (errno != EINTR && errno != EAGAIN) {
127 ERROR("Failed to write data: %s", std::strerror(errno));
130 // Socket is busy, lets try again
132 bytes_left -= written;
136 } // namespace graphics