Skip to content

Commit 9b19f24

Browse files
committed
Allow passing of factory params
1 parent ca74969 commit 9b19f24

File tree

3 files changed

+62
-6
lines changed

3 files changed

+62
-6
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.impl.streams;
20+
21+
import software.amazon.awssdk.services.s3.S3AsyncClient;
22+
23+
/**
24+
* Factory parameters class to hold various parameters needed for stream creation.
25+
*/
26+
public class FactoryParams {
27+
private final S3AsyncClient s3AsyncClient;
28+
29+
private FactoryParams(Builder builder) {
30+
this.s3AsyncClient = builder.s3AsyncClient;
31+
}
32+
33+
public S3AsyncClient getS3AsyncClient() {
34+
return s3AsyncClient;
35+
}
36+
37+
public static class Builder {
38+
private S3AsyncClient s3AsyncClient;
39+
40+
public Builder withS3AsyncClient(S3AsyncClient s3AsyncClient) {
41+
this.s3AsyncClient = s3AsyncClient;
42+
return this;
43+
}
44+
45+
public FactoryParams build() {
46+
return new FactoryParams(this);
47+
}
48+
}
49+
50+
public static Builder builder() {
51+
return new Builder();
52+
}
53+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import org.apache.hadoop.conf.Configuration;
2525
import org.apache.hadoop.fs.s3a.prefetch.PrefetchingInputStreamFactory;
26-
import software.amazon.awssdk.services.s3.S3AsyncClient;
2726

2827
/**
2928
* Enum of input stream types.
@@ -45,14 +44,14 @@ public enum InputStreamType {
4544
/**
4645
* The analytics input stream.
4746
*/
48-
Analytics("analytics", (c, s3AsyncClient) -> new S3ASeekableInputStreamFactory(s3AsyncClient));
47+
Analytics("analytics", (c, factoryParams) -> new S3ASeekableInputStreamFactory(factoryParams.getS3AsyncClient()));
4948

5049
/**
5150
* Name.
5251
*/
5352
private final String name;
5453

55-
private final BiFunction<Configuration, S3AsyncClient, ObjectInputStreamFactory> factory;
54+
private final BiFunction<Configuration, FactoryParams, ObjectInputStreamFactory> factory;
5655
/**
5756
* String name.
5857
* @return the name
@@ -65,7 +64,7 @@ public String getName() {
6564
this(name, (c, s) -> factory.apply(c));
6665
}
6766

68-
InputStreamType(String name, BiFunction<Configuration, S3AsyncClient, ObjectInputStreamFactory> factory) {
67+
InputStreamType(String name, BiFunction<Configuration, FactoryParams, ObjectInputStreamFactory> factory) {
6968
this.name = name;
7069
this.factory = factory;
7170
}
@@ -74,7 +73,7 @@ public String getName() {
7473
* Factory constructor.
7574
* @return the factory associated with this stream type.
7675
*/
77-
public BiFunction<Configuration, S3AsyncClient, ObjectInputStreamFactory> factory() {
76+
public BiFunction<Configuration, FactoryParams, ObjectInputStreamFactory> factory() {
7877
return factory;
7978
}
8079

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,17 @@ public static ObjectInputStreamFactory createStreamFactory(final Configuration c
6363

6464
/**
6565
* Create the s3 seekable input stream factory.
66+
* @param conf configuration
6667
* @param s3AsyncClient s3 async client
6768
* @return a stream factory.
6869
*/
6970
public static ObjectInputStreamFactory createStreamFactory(final Configuration conf, final S3AsyncClient s3AsyncClient) {
71+
FactoryParams factoryParams = FactoryParams.builder()
72+
.withS3AsyncClient(s3AsyncClient)
73+
.build();
7074
InputStreamType defaultStream = InputStreamType.DEFAULT_STREAM_TYPE;
7175
return conf.getEnum(INPUT_STREAM_TYPE, defaultStream)
7276
.factory()
73-
.apply(conf, s3AsyncClient); }
77+
.apply(conf, factoryParams); }
7478

7579
}

0 commit comments

Comments
 (0)