...

Text file src/github.com/google/flatbuffers/grpc/tests/JavaGrpcTest.java

Documentation: github.com/google/flatbuffers/grpc/tests

     1/*
     2 * Copyright 2014 Google Inc. All rights reserved.
     3 *
     4 * Licensed under the Apache License, Version 2.0 (the "License");
     5 * you may not use this file except in compliance with the License.
     6 * You may obtain a copy of the License at
     7 *
     8 *     http://www.apache.org/licenses/LICENSE-2.0
     9 *
    10 * Unless required by applicable law or agreed to in writing, software
    11 * distributed under the License is distributed on an "AS IS" BASIS,
    12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13 * See the License for the specific language governing permissions and
    14 * limitations under the License.
    15 */
    16
    17import MyGame.Example.Monster;
    18import MyGame.Example.MonsterStorageGrpc;
    19import MyGame.Example.Stat;
    20import com.google.flatbuffers.FlatBufferBuilder;
    21import io.grpc.ManagedChannel;
    22import io.grpc.ManagedChannelBuilder;
    23import io.grpc.Server;
    24import io.grpc.ServerBuilder;
    25import io.grpc.stub.StreamObserver;
    26import org.junit.Assert;
    27
    28import java.io.IOException;
    29import java.lang.InterruptedException;
    30import java.nio.ByteBuffer;
    31import java.util.Iterator;
    32import java.util.concurrent.TimeUnit;
    33import java.util.concurrent.atomic.AtomicReference;
    34import java.util.concurrent.atomic.AtomicInteger;
    35import java.util.concurrent.CountDownLatch;
    36
    37
    38/**
    39 * Demonstrates basic client-server interaction using grpc-java over netty.
    40 */
    41public class JavaGrpcTest {
    42    static final String BIG_MONSTER_NAME = "Cyberdemon";
    43    static final short nestedMonsterHp = 600;
    44    static final short nestedMonsterMana = 1024;
    45    static final int numStreamedMsgs = 10;
    46    static final int timeoutMs = 3000;
    47    static Server server;
    48    static ManagedChannel channel;
    49    static MonsterStorageGrpc.MonsterStorageBlockingStub blockingStub;
    50    static MonsterStorageGrpc.MonsterStorageStub asyncStub;
    51
    52    static class MyService extends MonsterStorageGrpc.MonsterStorageImplBase {
    53        @Override
    54        public void store(Monster request, io.grpc.stub.StreamObserver<Stat> responseObserver) {
    55            Assert.assertEquals(request.name(), BIG_MONSTER_NAME);
    56            Assert.assertEquals(request.hp(), nestedMonsterHp);
    57            Assert.assertEquals(request.mana(), nestedMonsterMana);
    58            System.out.println("Received store request from " + request.name());
    59            // Create a response from the incoming request name.
    60            Stat stat = GameFactory.createStat("Hello " + request.name(), 100, 10);
    61            responseObserver.onNext(stat);
    62            responseObserver.onCompleted();
    63        }
    64
    65        @Override
    66        public void retrieve(Stat request, io.grpc.stub.StreamObserver<Monster> responseObserver) {
    67            // Create 10 monsters for streaming response.
    68            for (int i=0; i<numStreamedMsgs; i++) {
    69                Monster monster = GameFactory.createMonsterFromStat(request, i);
    70                responseObserver.onNext(monster);
    71            }
    72            responseObserver.onCompleted();
    73        }
    74
    75        @Override
    76        public StreamObserver<Monster> getMaxHitPoint(final StreamObserver<Stat> responseObserver) {
    77          return computeMinMax(responseObserver, false);
    78        }
    79
    80        @Override
    81        public StreamObserver<Monster> getMinMaxHitPoints(final StreamObserver<Stat> responseObserver) {
    82          return computeMinMax(responseObserver, true);
    83        }
    84
    85        private StreamObserver<Monster> computeMinMax(final StreamObserver<Stat> responseObserver, final boolean includeMin) {
    86          final AtomicInteger maxHp = new AtomicInteger(Integer.MIN_VALUE);
    87          final AtomicReference<String> maxHpMonsterName = new AtomicReference<String>();
    88          final AtomicInteger maxHpCount = new AtomicInteger();
    89
    90          final AtomicInteger minHp = new AtomicInteger(Integer.MAX_VALUE);
    91          final AtomicReference<String> minHpMonsterName = new AtomicReference<String>();
    92          final AtomicInteger minHpCount = new AtomicInteger();
    93
    94          return new StreamObserver<Monster>() {
    95            public void onNext(Monster monster) {
    96              if (monster.hp() > maxHp.get()) {
    97                // Found a monster of higher hit points.
    98                maxHp.set(monster.hp());
    99                maxHpMonsterName.set(monster.name());
   100                maxHpCount.set(1);
   101              }
   102              else if (monster.hp() == maxHp.get()) {
   103                // Count how many times we saw a monster of current max hit points.
   104                maxHpCount.getAndIncrement();
   105              }
   106
   107              if (monster.hp() < minHp.get()) {
   108                // Found a monster of a lower hit points.
   109                minHp.set(monster.hp());
   110                minHpMonsterName.set(monster.name());
   111                minHpCount.set(1);
   112              }
   113              else if (monster.hp() == minHp.get()) {
   114                // Count how many times we saw a monster of current min hit points.
   115                minHpCount.getAndIncrement();
   116              }
   117            }
   118            public void onCompleted() {
   119              Stat maxHpStat = GameFactory.createStat(maxHpMonsterName.get(), maxHp.get(), maxHpCount.get());
   120              // Send max hit points first.
   121              responseObserver.onNext(maxHpStat);
   122              if (includeMin) {
   123                // Send min hit points.
   124                Stat minHpStat = GameFactory.createStat(minHpMonsterName.get(), minHp.get(), minHpCount.get());
   125                responseObserver.onNext(minHpStat);
   126              }
   127              responseObserver.onCompleted();
   128            }
   129            public void onError(Throwable t) {
   130              // Not expected
   131              Assert.fail();
   132            };
   133          };
   134        }
   135    }
   136
   137    @org.junit.BeforeClass
   138    public static void startServer() throws IOException {
   139        server = ServerBuilder.forPort(0).addService(new MyService()).build().start();
   140        int port = server.getPort();
   141        channel = ManagedChannelBuilder.forAddress("localhost", port)
   142                // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
   143                // needing certificates.
   144                .usePlaintext()
   145                .directExecutor()
   146                .build();
   147        blockingStub = MonsterStorageGrpc.newBlockingStub(channel);
   148        asyncStub = MonsterStorageGrpc.newStub(channel);
   149    }
   150
   151    @org.junit.Test
   152    public void testUnary() throws IOException {
   153        Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana);
   154        Stat stat = blockingStub.store(monsterRequest);
   155        Assert.assertEquals(stat.id(), "Hello " + BIG_MONSTER_NAME);
   156        System.out.println("Received stat response from service: " + stat.id());
   157    }
   158
   159    @org.junit.Test
   160    public void testServerStreaming() throws IOException {
   161        Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana);
   162        Stat stat = blockingStub.store(monsterRequest);
   163        Iterator<Monster> iterator = blockingStub.retrieve(stat);
   164        int counter = 0;
   165        while(iterator.hasNext()) {
   166            Monster m = iterator.next();
   167            System.out.println("Received monster " + m.name());
   168            counter ++;
   169        }
   170        Assert.assertEquals(counter, numStreamedMsgs);
   171        System.out.println("FlatBuffers GRPC client/server test: completed successfully");
   172    }
   173
   174    @org.junit.Test
   175    public void testClientStreaming() throws IOException, InterruptedException {
   176      final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>();
   177      final CountDownLatch streamAlive = new CountDownLatch(1);
   178
   179      StreamObserver<Stat> statObserver = new StreamObserver<Stat>() {
   180        public void onCompleted() {
   181          streamAlive.countDown();
   182        }
   183        public void onError(Throwable ex) { }
   184        public void onNext(Stat stat) {
   185          maxHitStat.set(stat);
   186        }
   187      };
   188      StreamObserver<Monster> monsterStream = asyncStub.getMaxHitPoint(statObserver);
   189      short count = 10;
   190      for (short i = 0;i < count; ++i) {
   191        Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana);
   192        monsterStream.onNext(monster);
   193      }
   194      monsterStream.onCompleted();
   195      // Wait a little bit for the server to send the stats of the monster with the max hit-points.
   196      streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS);
   197      Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1));
   198      Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1));
   199      Assert.assertEquals(maxHitStat.get().count(), 1);
   200    }
   201
   202    @org.junit.Test
   203    public void testBiDiStreaming() throws IOException, InterruptedException {
   204      final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>();
   205      final AtomicReference<Stat> minHitStat = new AtomicReference<Stat>();
   206      final CountDownLatch streamAlive = new CountDownLatch(1);
   207
   208      StreamObserver<Stat> statObserver = new StreamObserver<Stat>() {
   209        public void onCompleted() {
   210          streamAlive.countDown();
   211        }
   212        public void onError(Throwable ex) { }
   213        public void onNext(Stat stat) {
   214          // We expect the server to send the max stat first and then the min stat.
   215          if (maxHitStat.get() == null) {
   216            maxHitStat.set(stat);
   217          }
   218          else {
   219            minHitStat.set(stat);
   220          }
   221        }
   222      };
   223      StreamObserver<Monster> monsterStream = asyncStub.getMinMaxHitPoints(statObserver);
   224      short count = 10;
   225      for (short i = 0;i < count; ++i) {
   226        Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana);
   227        monsterStream.onNext(monster);
   228      }
   229      monsterStream.onCompleted();
   230
   231      // Wait a little bit for the server to send the stats of the monster with the max and min hit-points.
   232      streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS);
   233
   234      Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1));
   235      Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1));
   236      Assert.assertEquals(maxHitStat.get().count(), 1);
   237
   238      Assert.assertEquals(minHitStat.get().id(), BIG_MONSTER_NAME + 0);
   239      Assert.assertEquals(minHitStat.get().val(), nestedMonsterHp * 0);
   240      Assert.assertEquals(minHitStat.get().count(), 1);
   241    }
   242}

View as plain text