2 * Copyright (C) 2016 Simon Fels <morphis@gravedo.de>
4 * This program is free software: you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 3, as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranties of
10 * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11 * PURPOSE. See the GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program. If not, see <http://www.gnu.org/licenses/>.
18 #include "anbox/rpc/message_processor.h"
19 #include "anbox/common/variable_length_array.h"
20 #include "anbox/rpc/constants.h"
21 #include "anbox/rpc/make_protobuf_object.h"
22 #include "anbox/rpc/template_message_processor.h"
24 #include "anbox_rpc.pb.h"
28 const ::std::string &Invocation::method_name() const {
29 return invocation_.method_name();
32 const ::std::string &Invocation::parameters() const {
33 return invocation_.parameters();
36 google::protobuf::uint32 Invocation::id() const { return invocation_.id(); }
38 MessageProcessor::MessageProcessor(
39 const std::shared_ptr<network::MessageSender> &sender,
40 const std::shared_ptr<PendingCallCache> &pending_calls)
41 : sender_(sender), pending_calls_(pending_calls) {}
43 MessageProcessor::~MessageProcessor() {}
45 bool MessageProcessor::process_data(const std::vector<std::uint8_t> &data) {
46 for (const auto &byte : data) buffer_.push_back(byte);
48 while (buffer_.size() > 0) {
49 const auto high = buffer_[0];
50 const auto medium = buffer_[1];
51 const auto low = buffer_[2];
52 size_t const message_size = (high << 16) + (medium << 8) + low;
53 const auto message_type = buffer_[3];
55 // If we don't have yet all bytes for a new message return and wait
57 if (buffer_.size() - header_size < message_size) break;
59 if (message_type == MessageType::invocation) {
60 anbox::protobuf::rpc::Invocation raw_invocation;
61 raw_invocation.ParseFromArray(buffer_.data() + header_size, message_size);
63 dispatch(Invocation(raw_invocation));
64 } else if (message_type == MessageType::response) {
65 auto result = make_protobuf_object<protobuf::rpc::Result>();
66 result->ParseFromArray(buffer_.data() + header_size, message_size);
68 if (result->has_id()) {
69 pending_calls_->populate_message_for_result(*result,
70 [&](google::protobuf::MessageLite *result_message) {
71 result_message->ParseFromString(result->response());
73 pending_calls_->complete_response(*result);
76 for (int n = 0; n < result->events_size(); n++)
77 process_event_sequence(result->events(n));
80 buffer_.erase(buffer_.begin(),
81 buffer_.begin() + header_size + message_size);
87 void MessageProcessor::send_response(::google::protobuf::uint32 id,
88 google::protobuf::MessageLite *response) {
89 VariableLengthArray<serialization_buffer_size> send_response_buffer(
90 static_cast<size_t>(response->ByteSize()));
92 response->SerializeWithCachedSizesToArray(send_response_buffer.data());
94 anbox::protobuf::rpc::Result send_response_result;
95 send_response_result.set_id(id);
96 send_response_result.set_response(send_response_buffer.data(),
97 send_response_buffer.size());
99 send_response_buffer.resize(send_response_result.ByteSize());
100 send_response_result.SerializeWithCachedSizesToArray(
101 send_response_buffer.data());
103 const size_t size = send_response_buffer.size();
104 const unsigned char header_bytes[header_size] = {
105 static_cast<unsigned char>((size >> 16) & 0xff),
106 static_cast<unsigned char>((size >> 8) & 0xff),
107 static_cast<unsigned char>((size >> 0) & 0xff), MessageType::response,
110 std::vector<std::uint8_t> send_buffer(sizeof(header_bytes) + size);
111 std::copy(header_bytes, header_bytes + sizeof(header_bytes),
112 send_buffer.begin());
113 std::copy(send_response_buffer.data(),
114 send_response_buffer.data() + send_response_buffer.size(),
115 send_buffer.begin() + sizeof(header_bytes));
117 sender_->send(reinterpret_cast<const char *>(send_buffer.data()),
121 } // namespace network