1919package org .apache .hadoop .fs .s3a .commit ;
2020
2121import java .io .IOException ;
22+ import java .util .Arrays ;
23+ import java .util .Collection ;
2224
2325import org .junit .Test ;
26+ import org .junit .runner .RunWith ;
27+ import org .junit .runners .Parameterized ;
28+ import org .slf4j .Logger ;
29+ import org .slf4j .LoggerFactory ;
2430
2531import org .apache .hadoop .conf .Configuration ;
32+ import org .apache .hadoop .fs .FileSystem ;
2633import org .apache .hadoop .fs .Path ;
34+ import org .apache .hadoop .fs .PathIOException ;
2735import org .apache .hadoop .fs .s3a .commit .magic .MagicS3GuardCommitter ;
2836import org .apache .hadoop .fs .s3a .commit .staging .DirectoryStagingCommitter ;
2937import org .apache .hadoop .fs .s3a .commit .staging .PartitionedStagingCommitter ;
3038import org .apache .hadoop .fs .s3a .commit .staging .StagingCommitter ;
39+ import org .apache .hadoop .mapred .JobConf ;
3140import org .apache .hadoop .mapreduce .MRJobConfig ;
3241import org .apache .hadoop .mapreduce .TaskAttemptContext ;
3342import org .apache .hadoop .mapreduce .TaskAttemptID ;
3443import org .apache .hadoop .mapreduce .lib .output .FileOutputCommitter ;
3544import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
3645import org .apache .hadoop .mapreduce .lib .output .PathOutputCommitter ;
3746import org .apache .hadoop .mapreduce .task .TaskAttemptContextImpl ;
38- import org .apache .hadoop .test . LambdaTestUtils ;
47+ import org .apache .hadoop .security . UserGroupInformation ;
3948
49+ import static org .apache .hadoop .fs .s3a .S3ATestUtils .removeBaseAndBucketOverrides ;
4050import static org .apache .hadoop .fs .s3a .commit .CommitConstants .*;
51+ import static org .apache .hadoop .fs .s3a .commit .InternalCommitterConstants .COMMITTER_NAME_STAGING ;
52+ import static org .apache .hadoop .test .LambdaTestUtils .intercept ;
4153
4254/**
43- * Tests for some aspects of the committer factory.
44- * All tests are grouped into one single test so that only one
45- * S3A FS client is set up and used for the entire run.
46- * Saves time and money.
55+ * Tests for the committer factory creation/override process.
4756 */
48- public class ITestS3ACommitterFactory extends AbstractCommitITest {
49-
50-
51- protected static final String INVALID_NAME = "invalid-name" ;
57+ @ RunWith (Parameterized .class )
58+ public final class ITestS3ACommitterFactory extends AbstractCommitITest {
59+ private static final Logger LOG = LoggerFactory .getLogger (
60+ ITestS3ACommitterFactory .class );
61+ /**
62+ * Name for invalid committer: {@value}.
63+ */
64+ private static final String INVALID_NAME = "invalid-name" ;
5265
5366 /**
5467 * Counter to guarantee that even in parallel test runs, no job has the same
@@ -72,121 +85,156 @@ public class ITestS3ACommitterFactory extends AbstractCommitITest {
7285 * Parameterized list of bindings of committer name in config file to
7386 * expected class instantiated.
7487 */
75- private static final Object [][] bindings = {
76- {COMMITTER_NAME_FILE , FileOutputCommitter .class },
77- {COMMITTER_NAME_DIRECTORY , DirectoryStagingCommitter .class },
78- {COMMITTER_NAME_PARTITIONED , PartitionedStagingCommitter .class },
79- {InternalCommitterConstants .COMMITTER_NAME_STAGING ,
80- StagingCommitter .class },
81- {COMMITTER_NAME_MAGIC , MagicS3GuardCommitter .class }
88+ private static final Object [][] BINDINGS = {
89+ {"" , "" , FileOutputCommitter .class , "Default Binding" },
90+ {COMMITTER_NAME_FILE , "" , FileOutputCommitter .class , "File committer in FS" },
91+ {COMMITTER_NAME_PARTITIONED , "" , PartitionedStagingCommitter .class ,
92+ "partitoned committer in FS" },
93+ {COMMITTER_NAME_STAGING , "" , StagingCommitter .class , "staging committer in FS" },
94+ {COMMITTER_NAME_MAGIC , "" , MagicS3GuardCommitter .class , "magic committer in FS" },
95+ {COMMITTER_NAME_DIRECTORY , "" , DirectoryStagingCommitter .class , "Dir committer in FS" },
96+ {INVALID_NAME , "" , null , "invalid committer in FS" },
97+
98+ {"" , COMMITTER_NAME_FILE , FileOutputCommitter .class , "File committer in task" },
99+ {"" , COMMITTER_NAME_PARTITIONED , PartitionedStagingCommitter .class ,
100+ "partioned committer in task" },
101+ {"" , COMMITTER_NAME_STAGING , StagingCommitter .class , "staging committer in task" },
102+ {"" , COMMITTER_NAME_MAGIC , MagicS3GuardCommitter .class , "magic committer in task" },
103+ {"" , COMMITTER_NAME_DIRECTORY , DirectoryStagingCommitter .class , "Dir committer in task" },
104+ {"" , INVALID_NAME , null , "invalid committer in task" },
82105 };
83106
84107 /**
85- * This is a ref to the FS conf, so changes here are visible
86- * to callers querying the FS config.
108+ * Test array for parameterized test runs.
109+ *
110+ * @return the committer binding for this run.
87111 */
88- private Configuration filesystemConfRef ;
89-
90- private Configuration taskConfRef ;
112+ @ Parameterized .Parameters (name = "{3}-fs=[{0}]-task=[{1}]-[{2}]" )
113+ public static Collection <Object []> params () {
114+ return Arrays .asList (BINDINGS );
115+ }
91116
92- @ Override
93- public void setup () throws Exception {
94- super .setup ();
95- jobId = randomJobId ();
96- attempt0 = "attempt_" + jobId + "_m_000000_0" ;
97- taskAttempt0 = TaskAttemptID .forName (attempt0 );
117+ /**
118+ * Name of committer to set in filesystem config. If "" do not set one.
119+ */
120+ private final String fsCommitterName ;
98121
99- outDir = path (getMethodName ());
100- factory = new S3ACommitterFactory ();
101- Configuration conf = new Configuration ();
102- conf .set (FileOutputFormat .OUTDIR , outDir .toUri ().toString ());
103- conf .set (MRJobConfig .TASK_ATTEMPT_ID , attempt0 );
104- conf .setInt (MRJobConfig .APPLICATION_ATTEMPT_ID , 1 );
105- filesystemConfRef = getFileSystem ().getConf ();
106- tContext = new TaskAttemptContextImpl (conf , taskAttempt0 );
107- taskConfRef = tContext .getConfiguration ();
108- }
122+ /**
123+ * Name of committer to set in job config.
124+ */
125+ private final String jobCommitterName ;
109126
110- @ Test
111- public void testEverything () throws Throwable {
112- testImplicitFileBinding ();
113- testBindingsInTask ();
114- testBindingsInFSConfig ();
115- testInvalidFileBinding ();
116- testInvalidTaskBinding ();
117- }
127+ /**
128+ * Expected committer class.
129+ * If null: an exception is expected
130+ */
131+ private final Class <? extends AbstractS3ACommitter > committerClass ;
118132
119133 /**
120- * Verify that if all config options are unset, the FileOutputCommitter
121- *
122- * is returned.
134+ * Description from parameters, simply for thread names to be more informative.
123135 */
124- public void testImplicitFileBinding () throws Throwable {
125- taskConfRef .unset (FS_S3A_COMMITTER_NAME );
126- filesystemConfRef .unset (FS_S3A_COMMITTER_NAME );
127- assertFactoryCreatesExpectedCommitter (FileOutputCommitter .class );
128- }
136+ private final String description ;
129137
130138 /**
131- * Verify that task bindings are picked up.
139+ * Create a parameterized instance.
140+ * @param fsCommitterName committer to set in filesystem config
141+ * @param jobCommitterName committer to set in job config
142+ * @param committerClass expected committer class
143+ * @param description debug text for thread names.
132144 */
133- public void testBindingsInTask () throws Throwable {
134- // set this to an invalid value to be confident it is not
135- // being checked.
136- filesystemConfRef .set (FS_S3A_COMMITTER_NAME , "INVALID" );
137- taskConfRef .set (FS_S3A_COMMITTER_NAME , COMMITTER_NAME_FILE );
138- assertFactoryCreatesExpectedCommitter (FileOutputCommitter .class );
139- for (Object [] binding : bindings ) {
140- taskConfRef .set (FS_S3A_COMMITTER_NAME ,
141- (String ) binding [0 ]);
142- assertFactoryCreatesExpectedCommitter ((Class ) binding [1 ]);
143- }
145+ public ITestS3ACommitterFactory (
146+ final String fsCommitterName ,
147+ final String jobCommitterName ,
148+ final Class <? extends AbstractS3ACommitter > committerClass ,
149+ final String description ) {
150+ this .fsCommitterName = fsCommitterName ;
151+ this .jobCommitterName = jobCommitterName ;
152+ this .committerClass = committerClass ;
153+ this .description = description ;
154+ }
155+
156+ @ Override
157+ protected Configuration createConfiguration () {
158+ final Configuration conf = super .createConfiguration ();
159+ // do not cache, because we want the committer one to pick up
160+ // the fs with fs-specific configuration
161+ conf .setBoolean (FS_S3A_IMPL_DISABLE_CACHE , false );
162+ removeBaseAndBucketOverrides (conf , FS_S3A_COMMITTER_NAME );
163+ maybeSetCommitterName (conf , fsCommitterName );
164+ return conf ;
144165 }
145166
146167 /**
147- * Verify that FS bindings are picked up.
168+ * Set a committer name in a configuration.
169+ * @param conf configuration to patch.
170+ * @param name name. If "" the option is unset.
148171 */
149- public void testBindingsInFSConfig () throws Throwable {
150- taskConfRef .unset (FS_S3A_COMMITTER_NAME );
151- filesystemConfRef .set (FS_S3A_COMMITTER_NAME , COMMITTER_NAME_FILE );
152- assertFactoryCreatesExpectedCommitter (FileOutputCommitter .class );
153- for (Object [] binding : bindings ) {
154- taskConfRef .set (FS_S3A_COMMITTER_NAME , (String ) binding [0 ]);
155- assertFactoryCreatesExpectedCommitter ((Class ) binding [1 ]);
172+ private static void maybeSetCommitterName (final Configuration conf , final String name ) {
173+ if (!name .isEmpty ()) {
174+ conf .set (FS_S3A_COMMITTER_NAME , name );
175+ } else {
176+ conf .unset (FS_S3A_COMMITTER_NAME );
156177 }
157178 }
158179
159- /**
160- * Create an invalid committer via the FS binding.
161- */
162- public void testInvalidFileBinding () throws Throwable {
163- taskConfRef .unset (FS_S3A_COMMITTER_NAME );
164- filesystemConfRef .set (FS_S3A_COMMITTER_NAME , INVALID_NAME );
165- LambdaTestUtils .intercept (PathCommitException .class , INVALID_NAME ,
166- () -> createCommitter ());
180+ @ Override
181+ public void setup () throws Exception {
182+ // destroy all filesystems from previous runs.
183+ FileSystem .closeAllForUGI (UserGroupInformation .getCurrentUser ());
184+ super .setup ();
185+ jobId = randomJobId ();
186+ attempt0 = "attempt_" + jobId + "_m_000000_0" ;
187+ taskAttempt0 = TaskAttemptID .forName (attempt0 );
188+
189+ outDir = methodPath ();
190+ factory = new S3ACommitterFactory ();
191+ final Configuration fsConf = getConfiguration ();
192+ JobConf jobConf = new JobConf (fsConf );
193+ jobConf .set (FileOutputFormat .OUTDIR , outDir .toUri ().toString ());
194+ jobConf .set (MRJobConfig .TASK_ATTEMPT_ID , attempt0 );
195+ jobConf .setInt (MRJobConfig .APPLICATION_ATTEMPT_ID , 1 );
196+ maybeSetCommitterName (jobConf , jobCommitterName );
197+ tContext = new TaskAttemptContextImpl (jobConf , taskAttempt0 );
198+
199+ LOG .info ("{}: Filesystem Committer='{}'; task='{}'" ,
200+ description ,
201+ fsConf .get (FS_S3A_COMMITTER_NAME ),
202+ jobConf .get (FS_S3A_COMMITTER_NAME ));
203+ }
204+
205+
206+ @ Override
207+ protected void deleteTestDirInTeardown () {
208+ // no-op
167209 }
168210
169211 /**
170- * Create an invalid committer via the task attempt.
212+ * Verify that if all config options are unset, the FileOutputCommitter
213+ * is returned.
171214 */
172- public void testInvalidTaskBinding () throws Throwable {
173- filesystemConfRef .unset (FS_S3A_COMMITTER_NAME );
174- taskConfRef .set (FS_S3A_COMMITTER_NAME , INVALID_NAME );
175- LambdaTestUtils .intercept (PathCommitException .class , INVALID_NAME ,
176- () -> createCommitter ());
215+ @ Test
216+ public void testBinding () throws Throwable {
217+ assertFactoryCreatesExpectedCommitter (committerClass );
177218 }
178219
179220 /**
180221 * Assert that the factory creates the expected committer.
222+ * If a null committer is passed in, a {@link PathIOException}
223+ * is expected.
181224 * @param expected expected committer class.
182- * @throws IOException IO failure.
225+ * @throws Exception IO failure.
183226 */
184- protected void assertFactoryCreatesExpectedCommitter (
227+ private void assertFactoryCreatesExpectedCommitter (
185228 final Class expected )
186- throws IOException {
187- assertEquals ("Wrong Committer from factory" ,
188- expected ,
189- createCommitter ().getClass ());
229+ throws Exception {
230+ describe ("Creating committer: expected class \" %s\" " , expected );
231+ if (expected != null ) {
232+ assertEquals ("Wrong Committer from factory" ,
233+ expected ,
234+ createCommitter ().getClass ());
235+ } else {
236+ intercept (PathCommitException .class , this ::createCommitter );
237+ }
190238 }
191239
192240 /**
0 commit comments