...

Text file src/github.com/google/flatbuffers/grpc/tests/grpctest.cpp

Documentation: github.com/google/flatbuffers/grpc/tests

     1/*
     2 * Copyright 2014 Google Inc. All rights reserved.
     3 *
     4 * Licensed under the Apache License, Version 2.0 (the "License");
     5 * you may not use this file except in compliance with the License.
     6 * You may obtain a copy of the License at
     7 *
     8 *     http://www.apache.org/licenses/LICENSE-2.0
     9 *
    10 * Unless required by applicable law or agreed to in writing, software
    11 * distributed under the License is distributed on an "AS IS" BASIS,
    12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13 * See the License for the specific language governing permissions and
    14 * limitations under the License.
    15 */
    16
    17#include <grpcpp/grpcpp.h>
    18
    19#include <condition_variable>
    20#include <thread>
    21
    22#include "monster_test.grpc.fb.h"
    23#include "monster_test_generated.h"
    24#include "test_assert.h"
    25
    26using namespace MyGame::Example;
    27using flatbuffers::FlatBufferBuilder;
    28using flatbuffers::grpc::MessageBuilder;
    29
    30void message_builder_tests();
    31
    32// The callback implementation of our server, that derives from the generated
    33// code. It implements all rpcs specified in the FlatBuffers schema.
    34class ServiceImpl final : public MyGame::Example::MonsterStorage::Service {
    35  virtual ::grpc::Status Store(
    36      ::grpc::ServerContext *context,
    37      const flatbuffers::grpc::Message<Monster> *request,
    38      flatbuffers::grpc::Message<Stat> *response) override {
    39    // Create a response from the incoming request name.
    40    fbb_.Clear();
    41    auto stat_offset = CreateStat(
    42        fbb_, fbb_.CreateString("Hello, " + request->GetRoot()->name()->str()));
    43    fbb_.Finish(stat_offset);
    44    // Transfer ownership of the message to gRPC
    45    *response = fbb_.ReleaseMessage<Stat>();
    46    return grpc::Status::OK;
    47  }
    48  virtual ::grpc::Status Retrieve(
    49      ::grpc::ServerContext *context,
    50      const flatbuffers::grpc::Message<Stat> *request,
    51      ::grpc::ServerWriter<flatbuffers::grpc::Message<Monster>> *writer)
    52      override {
    53    for (int i = 0; i < 5; i++) {
    54      fbb_.Clear();
    55      // Create 5 monsters for resposne.
    56      auto monster_offset =
    57          CreateMonster(fbb_, 0, 0, 0,
    58                        fbb_.CreateString(request->GetRoot()->id()->str() +
    59                                          " No." + std::to_string(i)));
    60      fbb_.Finish(monster_offset);
    61
    62      flatbuffers::grpc::Message<Monster> monster =
    63          fbb_.ReleaseMessage<Monster>();
    64
    65      // Send monster to client using streaming.
    66      writer->Write(monster);
    67    }
    68    return grpc::Status::OK;
    69  }
    70
    71 private:
    72  flatbuffers::grpc::MessageBuilder fbb_;
    73};
    74
    75// Track the server instance, so we can terminate it later.
    76grpc::Server *server_instance = nullptr;
    77// Mutex to protec this variable.
    78std::mutex wait_for_server;
    79std::condition_variable server_instance_cv;
    80
    81// This function implements the server thread.
    82void RunServer() {
    83  auto server_address = "0.0.0.0:50051";
    84  // Callback interface we implemented above.
    85  ServiceImpl service;
    86  grpc::ServerBuilder builder;
    87  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    88  builder.RegisterService(&service);
    89
    90  // Start the server. Lock to change the variable we're changing.
    91  wait_for_server.lock();
    92  server_instance = builder.BuildAndStart().release();
    93  wait_for_server.unlock();
    94  server_instance_cv.notify_one();
    95
    96  std::cout << "Server listening on " << server_address << std::endl;
    97  // This will block the thread and serve requests.
    98  server_instance->Wait();
    99}
   100
   101template<class Builder> void StoreRPC(MonsterStorage::Stub *stub) {
   102  Builder fbb;
   103  grpc::ClientContext context;
   104  // Build a request with the name set.
   105  auto monster_offset = CreateMonster(fbb, 0, 0, 0, fbb.CreateString("Fred"));
   106  MessageBuilder mb(std::move(fbb));
   107  mb.Finish(monster_offset);
   108  auto request = mb.ReleaseMessage<Monster>();
   109  flatbuffers::grpc::Message<Stat> response;
   110
   111  // The actual RPC.
   112  auto status = stub->Store(&context, request, &response);
   113
   114  if (status.ok()) {
   115    auto resp = response.GetRoot()->id();
   116    std::cout << "RPC response: " << resp->str() << std::endl;
   117  } else {
   118    std::cout << "RPC failed" << std::endl;
   119  }
   120}
   121
   122template<class Builder> void RetrieveRPC(MonsterStorage::Stub *stub) {
   123  Builder fbb;
   124  grpc::ClientContext context;
   125  fbb.Clear();
   126  auto stat_offset = CreateStat(fbb, fbb.CreateString("Fred"));
   127  fbb.Finish(stat_offset);
   128  auto request = MessageBuilder(std::move(fbb)).ReleaseMessage<Stat>();
   129
   130  flatbuffers::grpc::Message<Monster> response;
   131  auto stream = stub->Retrieve(&context, request);
   132  while (stream->Read(&response)) {
   133    auto resp = response.GetRoot()->name();
   134    std::cout << "RPC Streaming response: " << resp->str() << std::endl;
   135  }
   136}
   137
   138int grpc_server_test() {
   139  // Launch server.
   140  std::thread server_thread(RunServer);
   141
   142  // wait for server to spin up.
   143  std::unique_lock<std::mutex> lock(wait_for_server);
   144  while (!server_instance) server_instance_cv.wait(lock);
   145
   146  // Now connect the client.
   147  auto channel = grpc::CreateChannel("localhost:50051",
   148                                     grpc::InsecureChannelCredentials());
   149  auto stub = MyGame::Example::MonsterStorage::NewStub(channel);
   150
   151  StoreRPC<MessageBuilder>(stub.get());
   152  StoreRPC<FlatBufferBuilder>(stub.get());
   153
   154  RetrieveRPC<MessageBuilder>(stub.get());
   155  RetrieveRPC<FlatBufferBuilder>(stub.get());
   156
   157#if !FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION
   158  {
   159    // Test that an invalid request errors out correctly
   160    grpc::ClientContext context;
   161    flatbuffers::grpc::Message<Monster> request;  // simulate invalid message
   162    flatbuffers::grpc::Message<Stat> response;
   163    auto status = stub->Store(&context, request, &response);
   164    // The rpc status should be INTERNAL to indicate a verification error. This
   165    // matches the protobuf gRPC status code for an unparseable message.
   166    assert(!status.ok());
   167    assert(status.error_code() == ::grpc::StatusCode::INTERNAL);
   168    assert(strcmp(status.error_message().c_str(),
   169                  "Message verification failed") == 0);
   170  }
   171#endif
   172
   173  server_instance->Shutdown();
   174
   175  server_thread.join();
   176
   177  delete server_instance;
   178
   179  return 0;
   180}
   181
   182int main(int /*argc*/, const char * /*argv*/[]) {
   183  message_builder_tests();
   184  grpc_server_test();
   185
   186  if (!testing_fails) {
   187    TEST_OUTPUT_LINE("ALL TESTS PASSED");
   188    return 0;
   189  } else {
   190    TEST_OUTPUT_LINE("%d FAILED TESTS", testing_fails);
   191    return 1;
   192  }
   193}

View as plain text