/* * Copyright (C) 2016 Simon Fels * * This program is free software: you can redistribute it and/or modify it * under the terms of the GNU General Public License version 3, as published * by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranties of * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR * PURPOSE. See the GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program. If not, see . * */ #include "anbox/rpc/message_processor.h" #include "anbox/common/variable_length_array.h" #include "anbox/rpc/constants.h" #include "anbox/rpc/make_protobuf_object.h" #include "anbox/rpc/template_message_processor.h" #include "anbox_rpc.pb.h" namespace anbox { namespace rpc { const ::std::string &Invocation::method_name() const { return invocation_.method_name(); } const ::std::string &Invocation::parameters() const { return invocation_.parameters(); } google::protobuf::uint32 Invocation::id() const { return invocation_.id(); } MessageProcessor::MessageProcessor( const std::shared_ptr &sender, const std::shared_ptr &pending_calls) : sender_(sender), pending_calls_(pending_calls) {} MessageProcessor::~MessageProcessor() {} bool MessageProcessor::process_data(const std::vector &data) { for (const auto &byte : data) buffer_.push_back(byte); while (buffer_.size() > 0) { const auto high = buffer_[0]; const auto medium = buffer_[1]; const auto low = buffer_[2]; size_t const message_size = (high << 16) + (medium << 8) + low; const auto message_type = buffer_[3]; // If we don't have yet all bytes for a new message return and wait // until we have all. if (buffer_.size() - header_size < message_size) break; if (message_type == MessageType::invocation) { anbox::protobuf::rpc::Invocation raw_invocation; raw_invocation.ParseFromArray(buffer_.data() + header_size, message_size); dispatch(Invocation(raw_invocation)); } else if (message_type == MessageType::response) { auto result = make_protobuf_object(); result->ParseFromArray(buffer_.data() + header_size, message_size); if (result->has_id()) { pending_calls_->populate_message_for_result(*result, [&](google::protobuf::MessageLite *result_message) { result_message->ParseFromString(result->response()); }); pending_calls_->complete_response(*result); } for (int n = 0; n < result->events_size(); n++) process_event_sequence(result->events(n)); } buffer_.erase(buffer_.begin(), buffer_.begin() + header_size + message_size); } return true; } void MessageProcessor::send_response(::google::protobuf::uint32 id, google::protobuf::MessageLite *response) { VariableLengthArray send_response_buffer( static_cast(response->ByteSize())); response->SerializeWithCachedSizesToArray(send_response_buffer.data()); anbox::protobuf::rpc::Result send_response_result; send_response_result.set_id(id); send_response_result.set_response(send_response_buffer.data(), send_response_buffer.size()); send_response_buffer.resize(send_response_result.ByteSize()); send_response_result.SerializeWithCachedSizesToArray( send_response_buffer.data()); const size_t size = send_response_buffer.size(); const unsigned char header_bytes[header_size] = { static_cast((size >> 16) & 0xff), static_cast((size >> 8) & 0xff), static_cast((size >> 0) & 0xff), MessageType::response, }; std::vector send_buffer(sizeof(header_bytes) + size); std::copy(header_bytes, header_bytes + sizeof(header_bytes), send_buffer.begin()); std::copy(send_response_buffer.data(), send_response_buffer.data() + send_response_buffer.size(), send_buffer.begin() + sizeof(header_bytes)); sender_->send(reinterpret_cast(send_buffer.data()), send_buffer.size()); } } // namespace anbox } // namespace network