TYPE3
[iec.git] / src / type3_AndroidCloud / anbox-master / src / anbox / graphics / buffered_io_stream.cpp
1 /*
2  * Copyright (C) 2016 Simon Fels <morphis@gravedo.de>
3  *
4  * This program is free software: you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 3, as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranties of
10  * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11  * PURPOSE.  See the GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program.  If not, see <http://www.gnu.org/licenses/>.
15  *
16  */
17
18 #include "anbox/graphics/buffered_io_stream.h"
19 #include "anbox/logger.h"
20
21 namespace anbox {
22 namespace graphics {
23 BufferedIOStream::BufferedIOStream(
24     const std::shared_ptr<anbox::network::SocketMessenger> &messenger,
25     size_t buffer_size)
26     : IOStream(buffer_size),
27       messenger_(messenger),
28       in_queue_(1024U),
29       out_queue_(16U),
30       worker_thread_(&BufferedIOStream::thread_main, this) {
31   write_buffer_.resize_noinit(buffer_size);
32 }
33
34 BufferedIOStream::~BufferedIOStream() {
35   forceStop();
36   if (worker_thread_.joinable()) worker_thread_.join();
37 }
38
39 void *BufferedIOStream::allocBuffer(size_t min_size) {
40   std::unique_lock<std::mutex> l(out_lock_);
41   if (write_buffer_.size() < min_size) write_buffer_.resize_noinit(min_size);
42   return write_buffer_.data();
43 }
44
45 size_t BufferedIOStream::commitBuffer(size_t size) {
46   std::unique_lock<std::mutex> l(out_lock_);
47   assert(size <= write_buffer_.size());
48   if (write_buffer_.isAllocated()) {
49     write_buffer_.resize(size);
50     out_queue_.push_locked(std::move(write_buffer_), l);
51   } else {
52     out_queue_.push_locked(
53         Buffer{write_buffer_.data(), write_buffer_.data() + size}, l);
54   }
55   return size;
56 }
57
58 const unsigned char *BufferedIOStream::read(void *buf, size_t *inout_len) {
59   std::unique_lock<std::mutex> l(lock_);
60   size_t wanted = *inout_len;
61   size_t count = 0U;
62   auto dst = static_cast<uint8_t *>(buf);
63   while (count < wanted) {
64     if (read_buffer_left_ > 0) {
65       size_t avail = std::min<size_t>(wanted - count, read_buffer_left_);
66       memcpy(dst + count,
67              read_buffer_.data() + (read_buffer_.size() - read_buffer_left_),
68              avail);
69       count += avail;
70       read_buffer_left_ -= avail;
71       continue;
72     }
73
74     bool blocking = (count == 0);
75     auto result = -EIO;
76     if (blocking)
77       result = in_queue_.pop_locked(&read_buffer_, l);
78     else
79       result = in_queue_.try_pop_locked(&read_buffer_);
80
81     if (result == 0) {
82       read_buffer_left_ = read_buffer_.size();
83       continue;
84     }
85
86     if (count > 0) break;
87
88     // If we end up here something went wrong and we couldn't read
89     // any valid data.
90     return nullptr;
91   }
92
93   *inout_len = count;
94   return static_cast<const unsigned char *>(buf);
95 }
96
97 void BufferedIOStream::forceStop() {
98   std::lock_guard<std::mutex> l(lock_);
99   in_queue_.close_locked();
100   out_queue_.close_locked();
101 }
102
103 void BufferedIOStream::post_data(Buffer &&data) {
104   std::unique_lock<std::mutex> l(lock_);
105   in_queue_.push_locked(std::move(data), l);
106 }
107
108 bool BufferedIOStream::needs_data() {
109   std::unique_lock<std::mutex> l(lock_);
110   return !in_queue_.can_pop_locked();
111 }
112
113 void BufferedIOStream::thread_main() {
114   while (true) {
115     std::unique_lock<std::mutex> l(out_lock_);
116
117     Buffer buffer;
118     const auto result = out_queue_.pop_locked(&buffer, l);
119     if (result != 0 && result != -EAGAIN) break;
120
121     auto bytes_left = buffer.size();
122     while (bytes_left > 0) {
123       const auto written = messenger_->send_raw(
124           buffer.data() + (buffer.size() - bytes_left), bytes_left);
125       if (written < 0) {
126         if (errno != EINTR && errno != EAGAIN) {
127           ERROR("Failed to write data: %s", std::strerror(errno));
128           break;
129         }
130         // Socket is busy, lets try again
131       } else
132         bytes_left -= written;
133     }
134   }
135 }
136 }  // namespace graphics
137 }  // namespace anbox