...
1/*
2 * Copyright 2014 Google Inc. All rights reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17import MyGame.Example.Monster;
18import MyGame.Example.MonsterStorageGrpc;
19import MyGame.Example.Stat;
20import com.google.flatbuffers.FlatBufferBuilder;
21import io.grpc.ManagedChannel;
22import io.grpc.ManagedChannelBuilder;
23import io.grpc.Server;
24import io.grpc.ServerBuilder;
25import io.grpc.stub.StreamObserver;
26import org.junit.Assert;
27
28import java.io.IOException;
29import java.lang.InterruptedException;
30import java.nio.ByteBuffer;
31import java.util.Iterator;
32import java.util.concurrent.TimeUnit;
33import java.util.concurrent.atomic.AtomicReference;
34import java.util.concurrent.atomic.AtomicInteger;
35import java.util.concurrent.CountDownLatch;
36
37
38/**
39 * Demonstrates basic client-server interaction using grpc-java over netty.
40 */
41public class JavaGrpcTest {
42 static final String BIG_MONSTER_NAME = "Cyberdemon";
43 static final short nestedMonsterHp = 600;
44 static final short nestedMonsterMana = 1024;
45 static final int numStreamedMsgs = 10;
46 static final int timeoutMs = 3000;
47 static Server server;
48 static ManagedChannel channel;
49 static MonsterStorageGrpc.MonsterStorageBlockingStub blockingStub;
50 static MonsterStorageGrpc.MonsterStorageStub asyncStub;
51
52 static class MyService extends MonsterStorageGrpc.MonsterStorageImplBase {
53 @Override
54 public void store(Monster request, io.grpc.stub.StreamObserver<Stat> responseObserver) {
55 Assert.assertEquals(request.name(), BIG_MONSTER_NAME);
56 Assert.assertEquals(request.hp(), nestedMonsterHp);
57 Assert.assertEquals(request.mana(), nestedMonsterMana);
58 System.out.println("Received store request from " + request.name());
59 // Create a response from the incoming request name.
60 Stat stat = GameFactory.createStat("Hello " + request.name(), 100, 10);
61 responseObserver.onNext(stat);
62 responseObserver.onCompleted();
63 }
64
65 @Override
66 public void retrieve(Stat request, io.grpc.stub.StreamObserver<Monster> responseObserver) {
67 // Create 10 monsters for streaming response.
68 for (int i=0; i<numStreamedMsgs; i++) {
69 Monster monster = GameFactory.createMonsterFromStat(request, i);
70 responseObserver.onNext(monster);
71 }
72 responseObserver.onCompleted();
73 }
74
75 @Override
76 public StreamObserver<Monster> getMaxHitPoint(final StreamObserver<Stat> responseObserver) {
77 return computeMinMax(responseObserver, false);
78 }
79
80 @Override
81 public StreamObserver<Monster> getMinMaxHitPoints(final StreamObserver<Stat> responseObserver) {
82 return computeMinMax(responseObserver, true);
83 }
84
85 private StreamObserver<Monster> computeMinMax(final StreamObserver<Stat> responseObserver, final boolean includeMin) {
86 final AtomicInteger maxHp = new AtomicInteger(Integer.MIN_VALUE);
87 final AtomicReference<String> maxHpMonsterName = new AtomicReference<String>();
88 final AtomicInteger maxHpCount = new AtomicInteger();
89
90 final AtomicInteger minHp = new AtomicInteger(Integer.MAX_VALUE);
91 final AtomicReference<String> minHpMonsterName = new AtomicReference<String>();
92 final AtomicInteger minHpCount = new AtomicInteger();
93
94 return new StreamObserver<Monster>() {
95 public void onNext(Monster monster) {
96 if (monster.hp() > maxHp.get()) {
97 // Found a monster of higher hit points.
98 maxHp.set(monster.hp());
99 maxHpMonsterName.set(monster.name());
100 maxHpCount.set(1);
101 }
102 else if (monster.hp() == maxHp.get()) {
103 // Count how many times we saw a monster of current max hit points.
104 maxHpCount.getAndIncrement();
105 }
106
107 if (monster.hp() < minHp.get()) {
108 // Found a monster of a lower hit points.
109 minHp.set(monster.hp());
110 minHpMonsterName.set(monster.name());
111 minHpCount.set(1);
112 }
113 else if (monster.hp() == minHp.get()) {
114 // Count how many times we saw a monster of current min hit points.
115 minHpCount.getAndIncrement();
116 }
117 }
118 public void onCompleted() {
119 Stat maxHpStat = GameFactory.createStat(maxHpMonsterName.get(), maxHp.get(), maxHpCount.get());
120 // Send max hit points first.
121 responseObserver.onNext(maxHpStat);
122 if (includeMin) {
123 // Send min hit points.
124 Stat minHpStat = GameFactory.createStat(minHpMonsterName.get(), minHp.get(), minHpCount.get());
125 responseObserver.onNext(minHpStat);
126 }
127 responseObserver.onCompleted();
128 }
129 public void onError(Throwable t) {
130 // Not expected
131 Assert.fail();
132 };
133 };
134 }
135 }
136
137 @org.junit.BeforeClass
138 public static void startServer() throws IOException {
139 server = ServerBuilder.forPort(0).addService(new MyService()).build().start();
140 int port = server.getPort();
141 channel = ManagedChannelBuilder.forAddress("localhost", port)
142 // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
143 // needing certificates.
144 .usePlaintext()
145 .directExecutor()
146 .build();
147 blockingStub = MonsterStorageGrpc.newBlockingStub(channel);
148 asyncStub = MonsterStorageGrpc.newStub(channel);
149 }
150
151 @org.junit.Test
152 public void testUnary() throws IOException {
153 Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana);
154 Stat stat = blockingStub.store(monsterRequest);
155 Assert.assertEquals(stat.id(), "Hello " + BIG_MONSTER_NAME);
156 System.out.println("Received stat response from service: " + stat.id());
157 }
158
159 @org.junit.Test
160 public void testServerStreaming() throws IOException {
161 Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana);
162 Stat stat = blockingStub.store(monsterRequest);
163 Iterator<Monster> iterator = blockingStub.retrieve(stat);
164 int counter = 0;
165 while(iterator.hasNext()) {
166 Monster m = iterator.next();
167 System.out.println("Received monster " + m.name());
168 counter ++;
169 }
170 Assert.assertEquals(counter, numStreamedMsgs);
171 System.out.println("FlatBuffers GRPC client/server test: completed successfully");
172 }
173
174 @org.junit.Test
175 public void testClientStreaming() throws IOException, InterruptedException {
176 final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>();
177 final CountDownLatch streamAlive = new CountDownLatch(1);
178
179 StreamObserver<Stat> statObserver = new StreamObserver<Stat>() {
180 public void onCompleted() {
181 streamAlive.countDown();
182 }
183 public void onError(Throwable ex) { }
184 public void onNext(Stat stat) {
185 maxHitStat.set(stat);
186 }
187 };
188 StreamObserver<Monster> monsterStream = asyncStub.getMaxHitPoint(statObserver);
189 short count = 10;
190 for (short i = 0;i < count; ++i) {
191 Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana);
192 monsterStream.onNext(monster);
193 }
194 monsterStream.onCompleted();
195 // Wait a little bit for the server to send the stats of the monster with the max hit-points.
196 streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS);
197 Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1));
198 Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1));
199 Assert.assertEquals(maxHitStat.get().count(), 1);
200 }
201
202 @org.junit.Test
203 public void testBiDiStreaming() throws IOException, InterruptedException {
204 final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>();
205 final AtomicReference<Stat> minHitStat = new AtomicReference<Stat>();
206 final CountDownLatch streamAlive = new CountDownLatch(1);
207
208 StreamObserver<Stat> statObserver = new StreamObserver<Stat>() {
209 public void onCompleted() {
210 streamAlive.countDown();
211 }
212 public void onError(Throwable ex) { }
213 public void onNext(Stat stat) {
214 // We expect the server to send the max stat first and then the min stat.
215 if (maxHitStat.get() == null) {
216 maxHitStat.set(stat);
217 }
218 else {
219 minHitStat.set(stat);
220 }
221 }
222 };
223 StreamObserver<Monster> monsterStream = asyncStub.getMinMaxHitPoints(statObserver);
224 short count = 10;
225 for (short i = 0;i < count; ++i) {
226 Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana);
227 monsterStream.onNext(monster);
228 }
229 monsterStream.onCompleted();
230
231 // Wait a little bit for the server to send the stats of the monster with the max and min hit-points.
232 streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS);
233
234 Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1));
235 Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1));
236 Assert.assertEquals(maxHitStat.get().count(), 1);
237
238 Assert.assertEquals(minHitStat.get().id(), BIG_MONSTER_NAME + 0);
239 Assert.assertEquals(minHitStat.get().val(), nestedMonsterHp * 0);
240 Assert.assertEquals(minHitStat.get().count(), 1);
241 }
242}
View as plain text