3838public class LogAggregationUtils {
3939
4040 public static final String TMP_FILE_SUFFIX = ".tmp" ;
41+ private static final String BUCKET_SUFFIX = "bucket_" ;
4142
4243 /**
4344 * Constructs the full filename for an application's log file per node.
@@ -64,8 +65,22 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
6465 */
6566 public static Path getRemoteAppLogDir (Path remoteRootLogDir ,
6667 ApplicationId appId , String user , String suffix ) {
67- return new Path (getRemoteLogSuffixedDir (remoteRootLogDir , user , suffix ),
68- appId .toString ());
68+ return new Path (getRemoteBucketDir (remoteRootLogDir , user , suffix ,
69+ appId ), appId .toString ());
70+ }
71+
72+ /**
73+ * Gets the older remote app log dir.
74+ * @param appId the application id
75+ * @param user the application owner
76+ * @param remoteRootLogDir the aggregated log remote root log dir
77+ * @param suffix the log directory suffix
78+ * @return the remote application specific log dir.
79+ */
80+ public static Path getOlderRemoteAppLogDir (ApplicationId appId ,
81+ String user , Path remoteRootLogDir , String suffix ) {
82+ return new Path (getOlderRemoteLogSuffixedDir (remoteRootLogDir , user ,
83+ suffix ), appId .toString ());
6984 }
7085
7186 /**
@@ -77,6 +92,19 @@ public static Path getRemoteAppLogDir(Path remoteRootLogDir,
7792 */
7893 public static Path getRemoteLogSuffixedDir (Path remoteRootLogDir ,
7994 String user , String suffix ) {
95+ suffix = getBucketSuffix () + suffix ;
96+ return new Path (getRemoteLogUserDir (remoteRootLogDir , user ), suffix );
97+ }
98+
99+ /**
100+ * Gets the older remote suffixed log dir for the user.
101+ * @param remoteRootLogDir the aggregated log remote root log dir
102+ * @param user the application owner
103+ * @param suffix the log dir suffix
104+ * @return the older remote suffixed log dir.
105+ */
106+ public static Path getOlderRemoteLogSuffixedDir (Path remoteRootLogDir ,
107+ String user , String suffix ) {
80108 if (suffix == null || suffix .isEmpty ()) {
81109 return getRemoteLogUserDir (remoteRootLogDir , user );
82110 }
@@ -94,6 +122,33 @@ public static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) {
94122 return new Path (remoteRootLogDir , user );
95123 }
96124
125+ /**
126+ * Gets the remote log user's bucket dir.
127+ * @param remoteRootLogDir the aggregated log remote root log dir
128+ * @param user the application owner
129+ * @param suffix the log dir suffix
130+ * @param appId the application id
131+ * @return the remote log per user per cluster timestamp per bucket dir.
132+ */
133+ public static Path getRemoteBucketDir (Path remoteRootLogDir , String user ,
134+ String suffix , ApplicationId appId ) {
135+ int bucket = appId .getId () % 10000 ;
136+ String bucketDir = String .format ("%04d" , bucket );
137+ return new Path (getRemoteLogSuffixedDir (remoteRootLogDir ,
138+ user , suffix ), bucketDir );
139+ }
140+
141+ /**
142+ * Check if older Application Log Directory has to be included.
143+ * @param conf the configuration
144+ * @return Is Older App Log Dir enabled?
145+ */
146+ public static boolean isOlderPathEnabled (Configuration conf ) {
147+ return conf .getBoolean (YarnConfiguration .
148+ NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER ,
149+ YarnConfiguration .DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER );
150+ }
151+
97152 /**
98153 * Returns the suffix component of the log dir.
99154 * @param conf the configuration
@@ -104,6 +159,14 @@ public static String getRemoteNodeLogDirSuffix(Configuration conf) {
104159 YarnConfiguration .DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX );
105160 }
106161
162+ /**
163+ * Returns the bucket suffix component of the log dir.
164+ * @return the bucket suffix which appended to user log dir
165+ */
166+ public static String getBucketSuffix () {
167+ return BUCKET_SUFFIX ;
168+ }
169+
107170
108171 /**
109172 * Converts a nodeId to a form used in the app log file name.
@@ -174,6 +237,24 @@ public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
174237 return remoteAppDir ;
175238 }
176239
240+ /**
241+ * Get all available log files under remote app log directory.
242+ * @param conf the configuration
243+ * @param remoteAppLogDir the application log directory
244+ * @param appId the applicationId
245+ * @param appOwner the application owner
246+ * @return the iterator of available log files
247+ * @throws IOException if there is no log file directory
248+ */
249+ public static RemoteIterator <FileStatus > getNodeFiles (Configuration conf ,
250+ Path remoteAppLogDir , ApplicationId appId , String appOwner )
251+ throws IOException {
252+ Path qualifiedLogDir =
253+ FileContext .getFileContext (conf ).makeQualified (remoteAppLogDir );
254+ return FileContext .getFileContext (
255+ qualifiedLogDir .toUri (), conf ).listStatus (remoteAppLogDir );
256+ }
257+
177258 /**
178259 * Get all available log files under remote app log directory.
179260 * @param conf the configuration
@@ -188,14 +269,58 @@ public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
188269 Configuration conf , ApplicationId appId , String appOwner ,
189270 org .apache .hadoop .fs .Path remoteRootLogDir , String suffix )
190271 throws IOException {
272+ RemoteIterator <FileStatus > nodeFilesCur = null ;
273+ RemoteIterator <FileStatus > nodeFilesPrev = null ;
274+ StringBuilder diagnosticsMsg = new StringBuilder ();
275+
276+ // Get Node Files from new app log dir
191277 Path remoteAppLogDir = getRemoteAppLogDir (conf , appId , appOwner ,
192278 remoteRootLogDir , suffix );
193- RemoteIterator <FileStatus > nodeFiles = null ;
194- Path qualifiedLogDir =
195- FileContext .getFileContext (conf ).makeQualified (remoteAppLogDir );
196- nodeFiles = FileContext .getFileContext (qualifiedLogDir .toUri (),
197- conf ).listStatus (remoteAppLogDir );
198- return nodeFiles ;
279+ try {
280+ nodeFilesCur = getNodeFiles (conf , remoteAppLogDir , appId , appOwner );
281+ } catch (IOException ex ) {
282+ diagnosticsMsg .append (ex .getMessage () + "\n " );
283+ }
284+
285+ // Get Node Files from old app log dir
286+ if (isOlderPathEnabled (conf )) {
287+ remoteAppLogDir = getOlderRemoteAppLogDir (appId , appOwner ,
288+ remoteRootLogDir , suffix );
289+ try {
290+ nodeFilesPrev = getNodeFiles (conf ,
291+ remoteAppLogDir , appId , appOwner );
292+ } catch (IOException ex ) {
293+ diagnosticsMsg .append (ex .getMessage () + "\n " );
294+ }
295+
296+ // Return older files if new app log dir does not exist
297+ if (nodeFilesCur == null ) {
298+ return nodeFilesPrev ;
299+ } else if (nodeFilesPrev != null ) {
300+ // Return both new and old node files combined
301+ RemoteIterator <FileStatus > curDir = nodeFilesCur ;
302+ RemoteIterator <FileStatus > prevDir = nodeFilesPrev ;
303+ RemoteIterator <FileStatus > nodeFilesCombined = new
304+ RemoteIterator <FileStatus >() {
305+ @ Override
306+ public boolean hasNext () throws IOException {
307+ return prevDir .hasNext () || curDir .hasNext ();
308+ }
309+
310+ @ Override
311+ public FileStatus next () throws IOException {
312+ return prevDir .hasNext () ? prevDir .next () : curDir .next ();
313+ }
314+ };
315+ return nodeFilesCombined ;
316+ }
317+ }
318+
319+ // Error reading from or new app log dir does not exist
320+ if (nodeFilesCur == null ) {
321+ throw new IOException (diagnosticsMsg .toString ());
322+ }
323+ return nodeFilesCur ;
199324 }
200325
201326 /**
@@ -212,13 +337,39 @@ public static List<FileStatus> getRemoteNodeFileList(
212337 Configuration conf , ApplicationId appId , String appOwner ,
213338 org .apache .hadoop .fs .Path remoteRootLogDir , String suffix )
214339 throws IOException {
340+ StringBuilder diagnosticsMsg = new StringBuilder ();
215341 Path remoteAppLogDir = getRemoteAppLogDir (conf , appId , appOwner ,
216342 remoteRootLogDir , suffix );
217343 List <FileStatus > nodeFiles = new ArrayList <>();
218344 Path qualifiedLogDir =
219345 FileContext .getFileContext (conf ).makeQualified (remoteAppLogDir );
220- nodeFiles .addAll (Arrays .asList (FileContext .getFileContext (
221- qualifiedLogDir .toUri (), conf ).util ().listStatus (remoteAppLogDir )));
346+
347+ // Get Node Files from new app log dir
348+ try {
349+ nodeFiles .addAll (Arrays .asList (FileContext .getFileContext (
350+ qualifiedLogDir .toUri (), conf ).util ().listStatus (remoteAppLogDir )));
351+ } catch (IOException ex ) {
352+ diagnosticsMsg .append (ex .getMessage () + "\n " );
353+ }
354+
355+ // Get Node Files from old app log dir
356+ if (isOlderPathEnabled (conf )) {
357+ remoteAppLogDir = getOlderRemoteAppLogDir (appId , appOwner ,
358+ remoteRootLogDir , suffix );
359+ qualifiedLogDir = FileContext .getFileContext (conf ).
360+ makeQualified (remoteAppLogDir );
361+ try {
362+ nodeFiles .addAll (Arrays .asList (FileContext .getFileContext (
363+ qualifiedLogDir .toUri (), conf ).util ().listStatus (remoteAppLogDir )));
364+ } catch (IOException ex ) {
365+ diagnosticsMsg .append (ex .getMessage () + "\n " );
366+ }
367+ }
368+
369+ // Error reading from or new app log dir does not exist
370+ if (nodeFiles .isEmpty ()) {
371+ throw new IOException (diagnosticsMsg .toString ());
372+ }
222373 return nodeFiles ;
223374 }
224375
@@ -233,12 +384,11 @@ public static List<FileStatus> getRemoteNodeFileList(
233384 public static RemoteIterator <FileStatus > getRemoteNodeFileDir (
234385 Configuration conf , ApplicationId appId , String appOwner )
235386 throws IOException {
236- Path remoteAppLogDir = getRemoteAppLogDir (conf , appId , appOwner );
237- RemoteIterator <FileStatus > nodeFiles = null ;
238- Path qualifiedLogDir =
239- FileContext .getFileContext (conf ).makeQualified (remoteAppLogDir );
240- nodeFiles = FileContext .getFileContext (qualifiedLogDir .toUri (),
241- conf ).listStatus (remoteAppLogDir );
242- return nodeFiles ;
387+ String suffix = LogAggregationUtils .getRemoteNodeLogDirSuffix (conf );
388+ Path remoteRootLogDir = new Path (conf .get (
389+ YarnConfiguration .NM_REMOTE_APP_LOG_DIR ,
390+ YarnConfiguration .DEFAULT_NM_REMOTE_APP_LOG_DIR ));
391+ return getRemoteNodeFileDir (conf , appId , appOwner ,
392+ remoteRootLogDir , suffix );
243393 }
244394}
0 commit comments