X-Git-Url: https://gerrit.akraino.org/r/gitweb?a=blobdiff_plain;f=src%2Ftype3_AndroidCloud%2Fanbox-master%2Fsrc%2Fanbox%2Frpc%2Fchannel.cpp;fp=src%2Ftype3_AndroidCloud%2Fanbox-master%2Fsrc%2Fanbox%2Frpc%2Fchannel.cpp;h=b0c2eada3e32d7490484033cf3d75e222ea8d18c;hb=e26c1ec581be598521517829adba8c8dd23a768f;hp=0000000000000000000000000000000000000000;hpb=6699c1aea74eeb0eb400e6299079f0c7576f716f;p=iec.git diff --git a/src/type3_AndroidCloud/anbox-master/src/anbox/rpc/channel.cpp b/src/type3_AndroidCloud/anbox-master/src/anbox/rpc/channel.cpp new file mode 100644 index 0000000..b0c2ead --- /dev/null +++ b/src/type3_AndroidCloud/anbox-master/src/anbox/rpc/channel.cpp @@ -0,0 +1,104 @@ +/* + * Copyright © 2012 Canonical Ltd. + * © 2016 Simon Fels + * + * This program is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + * + * Authored by: Alan Griffiths + */ + +#include "anbox/rpc/channel.h" +#include "anbox/common/variable_length_array.h" +#include "anbox/network/message_sender.h" +#include "anbox/rpc/constants.h" +#include "anbox/rpc/pending_call_cache.h" + +#include "anbox_rpc.pb.h" + +namespace anbox { +namespace rpc { +Channel::Channel(const std::shared_ptr &pending_calls, + const std::shared_ptr &sender) + : pending_calls_(pending_calls), sender_(sender) {} + +Channel::~Channel() {} + +void Channel::call_method(std::string const &method_name, + google::protobuf::MessageLite const *parameters, + google::protobuf::MessageLite *response, + google::protobuf::Closure *complete) { + auto const &invocation = invocation_for(method_name, parameters); + pending_calls_->save_completion_details(invocation, response, complete); + send_message(MessageType::invocation, invocation); +} + +void Channel::send_event(google::protobuf::MessageLite const &event) { + VariableLengthArray<2048> buffer{static_cast(event.ByteSize())}; + event.SerializeWithCachedSizesToArray(buffer.data()); + + anbox::protobuf::rpc::Result response; + response.add_events(buffer.data(), buffer.size()); + + send_message(MessageType::response, response); +} + +protobuf::rpc::Invocation Channel::invocation_for( + std::string const &method_name, + google::protobuf::MessageLite const *request) { + anbox::VariableLengthArray<2048> buffer{ + static_cast(request->ByteSize())}; + + request->SerializeWithCachedSizesToArray(buffer.data()); + + anbox::protobuf::rpc::Invocation invoke; + + invoke.set_id(next_id()); + invoke.set_method_name(method_name); + invoke.set_parameters(buffer.data(), buffer.size()); + invoke.set_protocol_version(1); + + return invoke; +} + +void Channel::send_message(const std::uint8_t &type, + google::protobuf::MessageLite const &message) { + const size_t size = message.ByteSize(); + const unsigned char header_bytes[header_size] = { + static_cast((size >>16) & 0xff), + static_cast((size >> 8) & 0xff), + static_cast((size >> 0) & 0xff), type, + }; + + std::vector send_buffer(sizeof(header_bytes) + size); + std::copy(header_bytes, header_bytes + sizeof(header_bytes), + send_buffer.begin()); + message.SerializeToArray(send_buffer.data() + sizeof(header_bytes), size); + + try { + std::lock_guard lock(write_mutex_); + sender_->send(reinterpret_cast(send_buffer.data()), + send_buffer.size()); + } catch (std::runtime_error const &) { + notify_disconnected(); + throw; + } +} + +void Channel::notify_disconnected() { pending_calls_->force_completion(); } + +std::uint32_t Channel::next_id() { + static std::uint32_t next_message_id = 0; + return next_message_id++; +} +} // namespace rpc +} // namespace anbox