X-Git-Url: https://gerrit.akraino.org/r/gitweb?a=blobdiff_plain;f=src%2Ftype3_AndroidCloud%2Fanbox-master%2Fsrc%2Fanbox%2Frpc%2Fmessage_processor.cpp;fp=src%2Ftype3_AndroidCloud%2Fanbox-master%2Fsrc%2Fanbox%2Frpc%2Fmessage_processor.cpp;h=ba25a5a9ffa0557e2241ee14911f21a6d34c7be0;hb=e26c1ec581be598521517829adba8c8dd23a768f;hp=0000000000000000000000000000000000000000;hpb=6699c1aea74eeb0eb400e6299079f0c7576f716f;p=iec.git diff --git a/src/type3_AndroidCloud/anbox-master/src/anbox/rpc/message_processor.cpp b/src/type3_AndroidCloud/anbox-master/src/anbox/rpc/message_processor.cpp new file mode 100644 index 0000000..ba25a5a --- /dev/null +++ b/src/type3_AndroidCloud/anbox-master/src/anbox/rpc/message_processor.cpp @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2016 Simon Fels + * + * This program is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 3, as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranties of + * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR + * PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see . + * + */ + +#include "anbox/rpc/message_processor.h" +#include "anbox/common/variable_length_array.h" +#include "anbox/rpc/constants.h" +#include "anbox/rpc/make_protobuf_object.h" +#include "anbox/rpc/template_message_processor.h" + +#include "anbox_rpc.pb.h" + +namespace anbox { +namespace rpc { +const ::std::string &Invocation::method_name() const { + return invocation_.method_name(); +} + +const ::std::string &Invocation::parameters() const { + return invocation_.parameters(); +} + +google::protobuf::uint32 Invocation::id() const { return invocation_.id(); } + +MessageProcessor::MessageProcessor( + const std::shared_ptr &sender, + const std::shared_ptr &pending_calls) + : sender_(sender), pending_calls_(pending_calls) {} + +MessageProcessor::~MessageProcessor() {} + +bool MessageProcessor::process_data(const std::vector &data) { + for (const auto &byte : data) buffer_.push_back(byte); + + while (buffer_.size() > 0) { + const auto high = buffer_[0]; + const auto medium = buffer_[1]; + const auto low = buffer_[2]; + size_t const message_size = (high << 16) + (medium << 8) + low; + const auto message_type = buffer_[3]; + + // If we don't have yet all bytes for a new message return and wait + // until we have all. + if (buffer_.size() - header_size < message_size) break; + + if (message_type == MessageType::invocation) { + anbox::protobuf::rpc::Invocation raw_invocation; + raw_invocation.ParseFromArray(buffer_.data() + header_size, message_size); + + dispatch(Invocation(raw_invocation)); + } else if (message_type == MessageType::response) { + auto result = make_protobuf_object(); + result->ParseFromArray(buffer_.data() + header_size, message_size); + + if (result->has_id()) { + pending_calls_->populate_message_for_result(*result, + [&](google::protobuf::MessageLite *result_message) { + result_message->ParseFromString(result->response()); + }); + pending_calls_->complete_response(*result); + } + + for (int n = 0; n < result->events_size(); n++) + process_event_sequence(result->events(n)); + } + + buffer_.erase(buffer_.begin(), + buffer_.begin() + header_size + message_size); + } + + return true; +} + +void MessageProcessor::send_response(::google::protobuf::uint32 id, + google::protobuf::MessageLite *response) { + VariableLengthArray send_response_buffer( + static_cast(response->ByteSize())); + + response->SerializeWithCachedSizesToArray(send_response_buffer.data()); + + anbox::protobuf::rpc::Result send_response_result; + send_response_result.set_id(id); + send_response_result.set_response(send_response_buffer.data(), + send_response_buffer.size()); + + send_response_buffer.resize(send_response_result.ByteSize()); + send_response_result.SerializeWithCachedSizesToArray( + send_response_buffer.data()); + + const size_t size = send_response_buffer.size(); + const unsigned char header_bytes[header_size] = { + static_cast((size >> 16) & 0xff), + static_cast((size >> 8) & 0xff), + static_cast((size >> 0) & 0xff), MessageType::response, + }; + + std::vector send_buffer(sizeof(header_bytes) + size); + std::copy(header_bytes, header_bytes + sizeof(header_bytes), + send_buffer.begin()); + std::copy(send_response_buffer.data(), + send_response_buffer.data() + send_response_buffer.size(), + send_buffer.begin() + sizeof(header_bytes)); + + sender_->send(reinterpret_cast(send_buffer.data()), + send_buffer.size()); +} +} // namespace anbox +} // namespace network