TYPE3
[iec.git] / src / type3_AndroidCloud / anbox-master / src / anbox / rpc / message_processor.cpp
diff --git a/src/type3_AndroidCloud/anbox-master/src/anbox/rpc/message_processor.cpp b/src/type3_AndroidCloud/anbox-master/src/anbox/rpc/message_processor.cpp
new file mode 100644 (file)
index 0000000..ba25a5a
--- /dev/null
@@ -0,0 +1,121 @@
+/*
+ * Copyright (C) 2016 Simon Fels <morphis@gravedo.de>
+ *
+ * This program is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 3, as published
+ * by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranties of
+ * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "anbox/rpc/message_processor.h"
+#include "anbox/common/variable_length_array.h"
+#include "anbox/rpc/constants.h"
+#include "anbox/rpc/make_protobuf_object.h"
+#include "anbox/rpc/template_message_processor.h"
+
+#include "anbox_rpc.pb.h"
+
+namespace anbox {
+namespace rpc {
+const ::std::string &Invocation::method_name() const {
+  return invocation_.method_name();
+}
+
+const ::std::string &Invocation::parameters() const {
+  return invocation_.parameters();
+}
+
+google::protobuf::uint32 Invocation::id() const { return invocation_.id(); }
+
+MessageProcessor::MessageProcessor(
+    const std::shared_ptr<network::MessageSender> &sender,
+    const std::shared_ptr<PendingCallCache> &pending_calls)
+    : sender_(sender), pending_calls_(pending_calls) {}
+
+MessageProcessor::~MessageProcessor() {}
+
+bool MessageProcessor::process_data(const std::vector<std::uint8_t> &data) {
+  for (const auto &byte : data) buffer_.push_back(byte);
+
+  while (buffer_.size() > 0) {
+    const auto high = buffer_[0];
+    const auto medium = buffer_[1];
+    const auto low = buffer_[2];
+    size_t const message_size = (high << 16) + (medium << 8) + low;
+    const auto message_type = buffer_[3];
+
+    // If we don't have yet all bytes for a new message return and wait
+    // until we have all.
+    if (buffer_.size() - header_size < message_size) break;
+
+    if (message_type == MessageType::invocation) {
+      anbox::protobuf::rpc::Invocation raw_invocation;
+      raw_invocation.ParseFromArray(buffer_.data() + header_size, message_size);
+
+      dispatch(Invocation(raw_invocation));
+    } else if (message_type == MessageType::response) {
+      auto result = make_protobuf_object<protobuf::rpc::Result>();
+      result->ParseFromArray(buffer_.data() + header_size, message_size);
+
+      if (result->has_id()) {
+        pending_calls_->populate_message_for_result(*result,
+                                                    [&](google::protobuf::MessageLite *result_message) {
+                                                      result_message->ParseFromString(result->response());
+                                                    });
+        pending_calls_->complete_response(*result);
+      }
+
+      for (int n = 0; n < result->events_size(); n++)
+        process_event_sequence(result->events(n));
+    }
+
+    buffer_.erase(buffer_.begin(),
+                  buffer_.begin() + header_size + message_size);
+  }
+
+  return true;
+}
+
+void MessageProcessor::send_response(::google::protobuf::uint32 id,
+                                     google::protobuf::MessageLite *response) {
+  VariableLengthArray<serialization_buffer_size> send_response_buffer(
+      static_cast<size_t>(response->ByteSize()));
+
+  response->SerializeWithCachedSizesToArray(send_response_buffer.data());
+
+  anbox::protobuf::rpc::Result send_response_result;
+  send_response_result.set_id(id);
+  send_response_result.set_response(send_response_buffer.data(),
+                                    send_response_buffer.size());
+
+  send_response_buffer.resize(send_response_result.ByteSize());
+  send_response_result.SerializeWithCachedSizesToArray(
+      send_response_buffer.data());
+
+  const size_t size = send_response_buffer.size();
+  const unsigned char header_bytes[header_size] = {
+      static_cast<unsigned char>((size >> 16) & 0xff),
+      static_cast<unsigned char>((size >> 8) & 0xff),
+      static_cast<unsigned char>((size >> 0) & 0xff), MessageType::response,
+  };
+
+  std::vector<std::uint8_t> send_buffer(sizeof(header_bytes) + size);
+  std::copy(header_bytes, header_bytes + sizeof(header_bytes),
+            send_buffer.begin());
+  std::copy(send_response_buffer.data(),
+            send_response_buffer.data() + send_response_buffer.size(),
+            send_buffer.begin() + sizeof(header_bytes));
+
+  sender_->send(reinterpret_cast<const char *>(send_buffer.data()),
+                send_buffer.size());
+}
+}  // namespace anbox
+}  // namespace network