20
20
21
21
import com .google .common .annotations .VisibleForTesting ;
22
22
import com .google .common .base .Preconditions ;
23
- import com .google .common .util .concurrent .MoreExecutors ;
24
23
import io .grpc .Attributes ;
25
24
import io .grpc .BinaryLog ;
26
25
import io .grpc .ClientInterceptor ;
38
37
import java .net .URI ;
39
38
import java .net .URISyntaxException ;
40
39
import java .util .ArrayList ;
41
- import java .util .Arrays ;
42
40
import java .util .Collections ;
43
- import java .util .LinkedHashMap ;
44
41
import java .util .List ;
45
42
import java .util .Map ;
46
43
import java .util .concurrent .Executor ;
@@ -86,13 +83,13 @@ public static ManagedChannelBuilder<?> forTarget(String target) {
86
83
*/
87
84
static final long IDLE_MODE_MIN_TIMEOUT_MILLIS = TimeUnit .SECONDS .toMillis (1 );
88
85
89
- private static final ObjectPool <? extends Executor > DEFAULT_EXECUTOR_POOL =
86
+ protected static final ObjectPool <? extends Executor > DEFAULT_EXECUTOR_POOL =
90
87
SharedResourcePool .forResource (GrpcUtil .SHARED_CHANNEL_EXECUTOR );
91
88
92
- private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY =
89
+ protected static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY =
93
90
DecompressorRegistry .getDefaultInstance ();
94
91
95
- private static final CompressorRegistry DEFAULT_COMPRESSOR_REGISTRY =
92
+ protected static final CompressorRegistry DEFAULT_COMPRESSOR_REGISTRY =
96
93
CompressorRegistry .getDefaultInstance ();
97
94
98
95
private static final long DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES = 1L << 24 ; // 16M
@@ -102,16 +99,16 @@ public static ManagedChannelBuilder<?> forTarget(String target) {
102
99
103
100
ObjectPool <? extends Executor > offloadExecutorPool = DEFAULT_EXECUTOR_POOL ;
104
101
105
- private final List <ClientInterceptor > interceptors = new ArrayList <>();
102
+ protected final List <ClientInterceptor > interceptors = new ArrayList <>();
106
103
final NameResolverRegistry nameResolverRegistry = NameResolverRegistry .getDefaultRegistry ();
107
104
108
105
// Access via getter, which may perform authority override as needed
109
- private NameResolver .Factory nameResolverFactory = nameResolverRegistry .asFactory ();
106
+ protected NameResolver .Factory nameResolverFactory = nameResolverRegistry .asFactory ();
110
107
111
108
final String target ;
112
109
113
110
@ Nullable
114
- private final SocketAddress directServerAddress ;
111
+ protected final SocketAddress directServerAddress ;
115
112
116
113
@ Nullable
117
114
String userAgent ;
@@ -174,11 +171,11 @@ protected final int maxInboundMessageSize() {
174
171
return maxInboundMessageSize ;
175
172
}
176
173
177
- private boolean statsEnabled = true ;
178
- private boolean recordStartedRpcs = true ;
179
- private boolean recordFinishedRpcs = true ;
180
- private boolean recordRealTimeMetrics = false ;
181
- private boolean tracingEnabled = true ;
174
+ protected boolean statsEnabled = true ;
175
+ protected boolean recordStartedRpcs = true ;
176
+ protected boolean recordFinishedRpcs = true ;
177
+ protected boolean recordRealTimeMetrics = false ;
178
+ protected boolean tracingEnabled = true ;
182
179
183
180
protected AbstractManagedChannelImplBuilder (String target ) {
184
181
this .target = Preconditions .checkNotNull (target , "target" );
@@ -206,306 +203,17 @@ protected AbstractManagedChannelImplBuilder(SocketAddress directServerAddress, S
206
203
this .nameResolverFactory = new DirectAddressNameResolverFactory (directServerAddress , authority );
207
204
}
208
205
209
- @ Override
210
- public final T directExecutor () {
211
- return executor (MoreExecutors .directExecutor ());
212
- }
213
-
214
- @ Override
215
- public final T executor (Executor executor ) {
216
- if (executor != null ) {
217
- this .executorPool = new FixedObjectPool <>(executor );
218
- } else {
219
- this .executorPool = DEFAULT_EXECUTOR_POOL ;
220
- }
221
- return thisT ();
222
- }
223
-
224
- @ Override
225
- public final T offloadExecutor (Executor executor ) {
226
- if (executor != null ) {
227
- this .offloadExecutorPool = new FixedObjectPool <>(executor );
228
- } else {
229
- this .offloadExecutorPool = DEFAULT_EXECUTOR_POOL ;
230
- }
231
- return thisT ();
232
- }
233
-
234
- @ Override
235
- public final T intercept (List <ClientInterceptor > interceptors ) {
236
- this .interceptors .addAll (interceptors );
237
- return thisT ();
238
- }
239
-
240
- @ Override
241
- public final T intercept (ClientInterceptor ... interceptors ) {
242
- return intercept (Arrays .asList (interceptors ));
243
- }
244
-
245
- @ Deprecated
246
- @ Override
247
- public final T nameResolverFactory (NameResolver .Factory resolverFactory ) {
248
- Preconditions .checkState (directServerAddress == null ,
249
- "directServerAddress is set (%s), which forbids the use of NameResolverFactory" ,
250
- directServerAddress );
251
- if (resolverFactory != null ) {
252
- this .nameResolverFactory = resolverFactory ;
253
- } else {
254
- this .nameResolverFactory = nameResolverRegistry .asFactory ();
255
- }
256
- return thisT ();
257
- }
258
-
259
- @ Override
260
- public final T defaultLoadBalancingPolicy (String policy ) {
261
- Preconditions .checkState (directServerAddress == null ,
262
- "directServerAddress is set (%s), which forbids the use of load-balancing policy" ,
263
- directServerAddress );
264
- Preconditions .checkArgument (policy != null , "policy cannot be null" );
265
- this .defaultLbPolicy = policy ;
266
- return thisT ();
267
- }
268
-
269
- @ Override
270
- public final T enableFullStreamDecompression () {
271
- this .fullStreamDecompression = true ;
272
- return thisT ();
273
- }
274
-
275
- @ Override
276
- public final T decompressorRegistry (DecompressorRegistry registry ) {
277
- if (registry != null ) {
278
- this .decompressorRegistry = registry ;
279
- } else {
280
- this .decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY ;
281
- }
282
- return thisT ();
283
- }
284
-
285
- @ Override
286
- public final T compressorRegistry (CompressorRegistry registry ) {
287
- if (registry != null ) {
288
- this .compressorRegistry = registry ;
289
- } else {
290
- this .compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY ;
291
- }
292
- return thisT ();
293
- }
294
-
295
- @ Override
296
- public final T userAgent (@ Nullable String userAgent ) {
297
- this .userAgent = userAgent ;
298
- return thisT ();
299
- }
300
-
301
- @ Override
302
- public final T overrideAuthority (String authority ) {
303
- this .authorityOverride = checkAuthority (authority );
304
- return thisT ();
305
- }
306
-
307
- @ Override
308
- public final T idleTimeout (long value , TimeUnit unit ) {
309
- checkArgument (value > 0 , "idle timeout is %s, but must be positive" , value );
310
- // We convert to the largest unit to avoid overflow
311
- if (unit .toDays (value ) >= IDLE_MODE_MAX_TIMEOUT_DAYS ) {
312
- // This disables idle mode
313
- this .idleTimeoutMillis = ManagedChannelImpl .IDLE_TIMEOUT_MILLIS_DISABLE ;
314
- } else {
315
- this .idleTimeoutMillis = Math .max (unit .toMillis (value ), IDLE_MODE_MIN_TIMEOUT_MILLIS );
316
- }
317
- return thisT ();
318
- }
319
-
320
- @ Override
321
- public final T maxRetryAttempts (int maxRetryAttempts ) {
322
- this .maxRetryAttempts = maxRetryAttempts ;
323
- return thisT ();
324
- }
325
-
326
- @ Override
327
- public final T maxHedgedAttempts (int maxHedgedAttempts ) {
328
- this .maxHedgedAttempts = maxHedgedAttempts ;
329
- return thisT ();
330
- }
331
-
332
- @ Override
333
- public final T retryBufferSize (long bytes ) {
334
- checkArgument (bytes > 0L , "retry buffer size must be positive" );
335
- retryBufferSize = bytes ;
336
- return thisT ();
337
- }
338
-
339
- @ Override
340
- public final T perRpcBufferLimit (long bytes ) {
341
- checkArgument (bytes > 0L , "per RPC buffer limit must be positive" );
342
- perRpcBufferLimit = bytes ;
343
- return thisT ();
344
- }
345
-
346
- @ Override
347
- public final T disableRetry () {
348
- retryEnabled = false ;
349
- return thisT ();
350
- }
351
-
352
- @ Override
353
- public final T enableRetry () {
354
- retryEnabled = true ;
355
- statsEnabled = false ;
356
- tracingEnabled = false ;
357
- return thisT ();
358
- }
359
-
360
- @ Override
361
- public final T setBinaryLog (BinaryLog binlog ) {
362
- this .binlog = binlog ;
363
- return thisT ();
364
- }
365
-
366
- @ Override
367
- public T maxTraceEvents (int maxTraceEvents ) {
368
- checkArgument (maxTraceEvents >= 0 , "maxTraceEvents must be non-negative" );
369
- this .maxTraceEvents = maxTraceEvents ;
370
- return thisT ();
371
- }
372
-
373
- @ Override
374
- public T proxyDetector (@ Nullable ProxyDetector proxyDetector ) {
375
- this .proxyDetector = proxyDetector ;
376
- return thisT ();
377
- }
378
-
379
- @ Override
380
- public T defaultServiceConfig (@ Nullable Map <String , ?> serviceConfig ) {
381
- // TODO(notcarl): use real parsing
382
- defaultServiceConfig = checkMapEntryTypes (serviceConfig );
383
- return thisT ();
384
- }
385
-
386
- @ Nullable
387
- private static Map <String , ?> checkMapEntryTypes (@ Nullable Map <?, ?> map ) {
388
- if (map == null ) {
389
- return null ;
390
- }
391
- // Not using ImmutableMap.Builder because of extra guava dependency for Android.
392
- Map <String , Object > parsedMap = new LinkedHashMap <>();
393
- for (Map .Entry <?, ?> entry : map .entrySet ()) {
394
- checkArgument (
395
- entry .getKey () instanceof String ,
396
- "The key of the entry '%s' is not of String type" , entry );
397
-
398
- String key = (String ) entry .getKey ();
399
- Object value = entry .getValue ();
400
- if (value == null ) {
401
- parsedMap .put (key , null );
402
- } else if (value instanceof Map ) {
403
- parsedMap .put (key , checkMapEntryTypes ((Map <?, ?>) value ));
404
- } else if (value instanceof List ) {
405
- parsedMap .put (key , checkListEntryTypes ((List <?>) value ));
406
- } else if (value instanceof String ) {
407
- parsedMap .put (key , value );
408
- } else if (value instanceof Double ) {
409
- parsedMap .put (key , value );
410
- } else if (value instanceof Boolean ) {
411
- parsedMap .put (key , value );
412
- } else {
413
- throw new IllegalArgumentException (
414
- "The value of the map entry '" + entry + "' is of type '" + value .getClass ()
415
- + "', which is not supported" );
416
- }
417
- }
418
- return Collections .unmodifiableMap (parsedMap );
419
- }
420
-
421
- private static List <?> checkListEntryTypes (List <?> list ) {
422
- List <Object > parsedList = new ArrayList <>(list .size ());
423
- for (Object value : list ) {
424
- if (value == null ) {
425
- parsedList .add (null );
426
- } else if (value instanceof Map ) {
427
- parsedList .add (checkMapEntryTypes ((Map <?, ?>) value ));
428
- } else if (value instanceof List ) {
429
- parsedList .add (checkListEntryTypes ((List <?>) value ));
430
- } else if (value instanceof String ) {
431
- parsedList .add (value );
432
- } else if (value instanceof Double ) {
433
- parsedList .add (value );
434
- } else if (value instanceof Boolean ) {
435
- parsedList .add (value );
436
- } else {
437
- throw new IllegalArgumentException (
438
- "The entry '" + value + "' is of type '" + value .getClass ()
439
- + "', which is not supported" );
440
- }
441
- }
442
- return Collections .unmodifiableList (parsedList );
443
- }
444
-
445
206
@ Override
446
207
public T disableServiceConfigLookUp () {
447
208
this .lookUpServiceConfig = false ;
448
209
return thisT ();
449
210
}
450
211
451
- /**
452
- * Disable or enable stats features. Enabled by default.
453
- *
454
- * <p>For the current release, calling {@code setStatsEnabled(true)} may have a side effect that
455
- * disables retry.
456
- */
457
- protected void setStatsEnabled (boolean value ) {
458
- statsEnabled = value ;
459
- }
460
-
461
- /**
462
- * Disable or enable stats recording for RPC upstarts. Effective only if {@link
463
- * #setStatsEnabled} is set to true. Enabled by default.
464
- */
465
- protected void setStatsRecordStartedRpcs (boolean value ) {
466
- recordStartedRpcs = value ;
467
- }
468
-
469
- /**
470
- * Disable or enable stats recording for RPC completions. Effective only if {@link
471
- * #setStatsEnabled} is set to true. Enabled by default.
472
- */
473
- protected void setStatsRecordFinishedRpcs (boolean value ) {
474
- recordFinishedRpcs = value ;
475
- }
476
-
477
- /**
478
- * Disable or enable real-time metrics recording. Effective only if {@link #setStatsEnabled} is
479
- * set to true. Disabled by default.
480
- */
481
- protected void setStatsRecordRealTimeMetrics (boolean value ) {
482
- recordRealTimeMetrics = value ;
483
- }
484
-
485
- /**
486
- * Disable or enable tracing features. Enabled by default.
487
- *
488
- * <p>For the current release, calling {@code setTracingEnabled(true)} may have a side effect that
489
- * disables retry.
490
- */
491
- protected void setTracingEnabled (boolean value ) {
492
- tracingEnabled = value ;
493
- }
494
-
495
212
@ VisibleForTesting
496
213
final long getIdleTimeoutMillis () {
497
214
return idleTimeoutMillis ;
498
215
}
499
216
500
- /**
501
- * Verifies the authority is valid. This method exists as an escape hatch for putting in an
502
- * authority that is valid, but would fail the default validation provided by this
503
- * implementation.
504
- */
505
- protected String checkAuthority (String authority ) {
506
- return GrpcUtil .checkAuthority (authority );
507
- }
508
-
509
217
// Temporarily disable retry when stats or tracing is enabled to avoid breakage, until we know
510
218
// what should be the desired behavior for retry + stats/tracing.
511
219
// TODO(zdapeng): FIX IT
0 commit comments