2525import java .io .BufferedWriter ;
2626import java .io .IOException ;
2727import java .util .ArrayList ;
28+ import java .util .Collections ;
2829import java .util .HashMap ;
2930import java .util .List ;
3031import java .util .Map ;
3132import java .util .Map .Entry ;
33+ import java .util .concurrent .Callable ;
34+ import java .util .concurrent .ExecutionException ;
35+ import java .util .concurrent .ExecutorService ;
36+ import java .util .concurrent .Future ;
37+ import java .util .concurrent .LinkedBlockingQueue ;
38+ import java .util .concurrent .ThreadPoolExecutor ;
3239import java .util .concurrent .TimeUnit ;
40+ import java .util .concurrent .atomic .AtomicBoolean ;
3341import java .util .regex .Matcher ;
3442import java .util .regex .Pattern ;
3543
44+ import org .apache .hadoop .thirdparty .com .google .common .util .concurrent .ThreadFactoryBuilder ;
45+
3646import org .apache .hadoop .hdfs .server .federation .metrics .StateStoreMetrics ;
3747import org .apache .hadoop .hdfs .server .federation .store .StateStoreUnavailableException ;
3848import org .apache .hadoop .hdfs .server .federation .store .StateStoreUtils ;
@@ -69,6 +79,8 @@ public abstract class StateStoreFileBaseImpl
6979 /** If it is initialized. */
7080 private boolean initialized = false ;
7181
82+ private ExecutorService concurrentStoreAccessPool ;
83+
7284
7385 /**
7486 * Get the reader of a record for the file system.
@@ -137,6 +149,8 @@ public abstract <T extends BaseRecord> BufferedWriter getWriter(
137149 */
138150 protected abstract String getRootDir ();
139151
152+ protected abstract int getConcurrentFilesAccessNumThreads ();
153+
140154 /**
141155 * Set the driver as initialized.
142156 *
@@ -168,9 +182,31 @@ public boolean initDriver() {
168182 return false ;
169183 }
170184 setInitialized (true );
185+ int threads = getConcurrentFilesAccessNumThreads ();
186+ if (threads > 1 ) {
187+ this .concurrentStoreAccessPool =
188+ new ThreadPoolExecutor (threads , threads , 0L , TimeUnit .MILLISECONDS ,
189+ new LinkedBlockingQueue <>(),
190+ new ThreadFactoryBuilder ()
191+ .setNameFormat ("state-store-file-based-concurrent-%d" )
192+ .setDaemon (true ).build ());
193+ LOG .info ("File based state store will be accessed concurrently with {} max threads" , threads );
194+ } else {
195+ LOG .info ("File based state store will be accessed serially" );
196+ }
171197 return true ;
172198 }
173199
200+ @ Override
201+ public void close () throws Exception {
202+ if (this .concurrentStoreAccessPool != null ) {
203+ this .concurrentStoreAccessPool .shutdown ();
204+ boolean isTerminated = this .concurrentStoreAccessPool .awaitTermination (5 , TimeUnit .SECONDS );
205+ LOG .info ("Concurrent store access pool is terminated: {}" , isTerminated );
206+ this .concurrentStoreAccessPool = null ;
207+ }
208+ }
209+
174210 @ Override
175211 public <T extends BaseRecord > boolean initRecordStorage (
176212 String className , Class <T > recordClass ) {
@@ -198,22 +234,29 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
198234 verifyDriverReady ();
199235 long start = monotonicNow ();
200236 StateStoreMetrics metrics = getMetrics ();
201- List <T > ret = new ArrayList <>();
237+ List <T > result = Collections . synchronizedList ( new ArrayList <>() );
202238 try {
203239 String path = getPathForClass (clazz );
204240 List <String > children = getChildren (path );
205- for (String child : children ) {
206- String pathRecord = path + "/" + child ;
207- if (child .endsWith (TMP_MARK )) {
208- LOG .debug ("There is a temporary file {} in {}" , child , path );
209- if (isOldTempRecord (child )) {
210- LOG .warn ("Removing {} as it's an old temporary record" , child );
211- remove (pathRecord );
212- }
213- } else {
214- T record = getRecord (pathRecord , clazz );
215- ret .add (record );
241+ List <Callable <Void >> callables = new ArrayList <>();
242+ children .forEach (child -> callables .add (
243+ () -> getRecordsFromFileAndRemoveOldTmpRecords (clazz , result , path , child )));
244+ if (this .concurrentStoreAccessPool != null ) {
245+ // Read records concurrently
246+ List <Future <Void >> futures = this .concurrentStoreAccessPool .invokeAll (callables );
247+ for (Future <Void > future : futures ) {
248+ future .get ();
216249 }
250+ } else {
251+ // Read records serially
252+ callables .forEach (e -> {
253+ try {
254+ e .call ();
255+ } catch (Exception ex ) {
256+ LOG .error ("Failed to retrieve record using file operations." , ex );
257+ throw new RuntimeException (ex );
258+ }
259+ });
217260 }
218261 } catch (Exception e ) {
219262 if (metrics != null ) {
@@ -227,7 +270,37 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
227270 if (metrics != null ) {
228271 metrics .addRead (monotonicNow () - start );
229272 }
230- return new QueryResult <T >(ret , getTime ());
273+ return new QueryResult <>(result , getTime ());
274+ }
275+
276+ /**
277+ * Get the state store record from the given path (path/child) and add the record to the
278+ * result list.
279+ *
280+ * @param clazz Class of the record.
281+ * @param result The list of results record. The records would be added to it unless the given
282+ * path represents old temp file.
283+ * @param path The parent path.
284+ * @param child The child path under the parent path. Both path and child completes the file
285+ * location for the given record.
286+ * @param <T> Record class of the records.
287+ * @return Void.
288+ * @throws IOException If the file read operation fails.
289+ */
290+ private <T extends BaseRecord > Void getRecordsFromFileAndRemoveOldTmpRecords (Class <T > clazz ,
291+ List <T > result , String path , String child ) throws IOException {
292+ String pathRecord = path + "/" + child ;
293+ if (child .endsWith (TMP_MARK )) {
294+ LOG .debug ("There is a temporary file {} in {}" , child , path );
295+ if (isOldTempRecord (child )) {
296+ LOG .warn ("Removing {} as it's an old temporary record" , child );
297+ remove (pathRecord );
298+ }
299+ } else {
300+ T record = getRecord (pathRecord , clazz );
301+ result .add (record );
302+ }
303+ return null ;
231304 }
232305
233306 /**
@@ -260,23 +333,17 @@ public static boolean isOldTempRecord(final String pathRecord) {
260333 */
261334 private <T extends BaseRecord > T getRecord (
262335 final String path , final Class <T > clazz ) throws IOException {
263- BufferedReader reader = getReader (path );
264- try {
336+ try (BufferedReader reader = getReader (path )) {
265337 String line ;
266338 while ((line = reader .readLine ()) != null ) {
267339 if (!line .startsWith ("#" ) && line .length () > 0 ) {
268340 try {
269- T record = newRecord (line , clazz , false );
270- return record ;
341+ return newRecord (line , clazz , false );
271342 } catch (Exception ex ) {
272343 LOG .error ("Cannot parse line {} in file {}" , line , path , ex );
273344 }
274345 }
275346 }
276- } finally {
277- if (reader != null ) {
278- reader .close ();
279- }
280347 }
281348 throw new IOException ("Cannot read " + path + " for record " +
282349 clazz .getSimpleName ());
@@ -330,13 +397,12 @@ public <T extends BaseRecord> boolean putAll(
330397 record .setDateModified (this .getTime ());
331398 toWrite .put (recordPath , record );
332399 } else if (errorIfExists ) {
333- LOG .error ("Attempt to insert record {} that already exists" ,
334- recordPath );
400+ LOG .error ("Attempt to insert record {} that already exists" , recordPath );
335401 if (metrics != null ) {
336402 metrics .addFailure (monotonicNow () - start );
337403 }
338404 return false ;
339- } else {
405+ } else {
340406 LOG .debug ("Not updating {}" , record );
341407 }
342408 } else {
@@ -345,36 +411,81 @@ public <T extends BaseRecord> boolean putAll(
345411 }
346412
347413 // Write the records
348- boolean success = true ;
349- for (Entry <String , T > entry : toWrite .entrySet ()) {
350- String recordPath = entry .getKey ();
351- String recordPathTemp = recordPath + "." + now () + TMP_MARK ;
352- boolean recordWrittenSuccessfully = true ;
353- try (BufferedWriter writer = getWriter (recordPathTemp )) {
354- T record = entry .getValue ();
355- String line = serializeString (record );
356- writer .write (line );
357- } catch (IOException e ) {
358- LOG .error ("Cannot write {}" , recordPathTemp , e );
359- recordWrittenSuccessfully = false ;
360- success = false ;
414+ final AtomicBoolean success = new AtomicBoolean (true );
415+ final List <Callable <Void >> callables = new ArrayList <>();
416+ toWrite .entrySet ().forEach (entry -> callables .add (() -> writeRecordToFile (success , entry )));
417+ if (this .concurrentStoreAccessPool != null ) {
418+ // Write records concurrently
419+ List <Future <Void >> futures = null ;
420+ try {
421+ futures = this .concurrentStoreAccessPool .invokeAll (callables );
422+ } catch (InterruptedException e ) {
423+ success .set (false );
424+ LOG .error ("Failed to put record concurrently." , e );
361425 }
362- // Commit
363- if (recordWrittenSuccessfully && !rename (recordPathTemp , recordPath )) {
364- LOG .error ("Failed committing record into {}" , recordPath );
365- success = false ;
426+ if (futures != null ) {
427+ for (Future <Void > future : futures ) {
428+ try {
429+ future .get ();
430+ } catch (InterruptedException | ExecutionException e ) {
431+ success .set (false );
432+ LOG .error ("Failed to retrieve results from concurrent record put runs." , e );
433+ }
434+ }
366435 }
436+ } else {
437+ // Write records serially
438+ callables .forEach (callable -> {
439+ try {
440+ callable .call ();
441+ } catch (Exception e ) {
442+ success .set (false );
443+ LOG .error ("Failed to put record." , e );
444+ }
445+ });
367446 }
368447
369448 long end = monotonicNow ();
370449 if (metrics != null ) {
371- if (success ) {
450+ if (success . get () ) {
372451 metrics .addWrite (end - start );
373452 } else {
374453 metrics .addFailure (end - start );
375454 }
376455 }
377- return success ;
456+ return success .get ();
457+ }
458+
459+ /**
460+ * Writes the state store record to the file. At first, the record is written to a temp location
461+ * and then later renamed to the final location that is passed with the entry key.
462+ *
463+ * @param success The atomic boolean that gets updated to false if the file write operation fails.
464+ * @param entry The entry of the record path and the state store record to be written to the file
465+ * by first writing to a temp location and then renaming it to the record path.
466+ * @param <T> Record class of the records.
467+ * @return Void.
468+ */
469+ private <T extends BaseRecord > Void writeRecordToFile (AtomicBoolean success ,
470+ Entry <String , T > entry ) {
471+ String recordPath = entry .getKey ();
472+ String recordPathTemp = recordPath + "." + now () + TMP_MARK ;
473+ boolean recordWrittenSuccessfully = true ;
474+ try (BufferedWriter writer = getWriter (recordPathTemp )) {
475+ T record = entry .getValue ();
476+ String line = serializeString (record );
477+ writer .write (line );
478+ } catch (IOException e ) {
479+ LOG .error ("Cannot write {}" , recordPathTemp , e );
480+ recordWrittenSuccessfully = false ;
481+ success .set (false );
482+ }
483+ // Commit
484+ if (recordWrittenSuccessfully && !rename (recordPathTemp , recordPath )) {
485+ LOG .error ("Failed committing record into {}" , recordPath );
486+ success .set (false );
487+ }
488+ return null ;
378489 }
379490
380491 @ Override
0 commit comments