|
18 | 18 | package org.apache.hadoop.fs.http.server; |
19 | 19 |
|
20 | 20 | import org.apache.hadoop.classification.InterfaceAudience; |
| 21 | +import org.apache.hadoop.conf.Configuration; |
21 | 22 | import org.apache.hadoop.fs.BlockStoragePolicySpi; |
22 | 23 | import org.apache.hadoop.fs.ContentSummary; |
23 | 24 | import org.apache.hadoop.fs.FileChecksum; |
|
47 | 48 | import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; |
48 | 49 | import org.apache.hadoop.hdfs.protocol.SnapshotStatus; |
49 | 50 | import org.apache.hadoop.hdfs.web.JsonUtil; |
50 | | -import org.apache.hadoop.io.IOUtils; |
51 | 51 | import org.apache.hadoop.lib.service.FileSystemAccess; |
52 | 52 | import org.apache.hadoop.util.StringUtils; |
53 | 53 | import org.json.simple.JSONArray; |
|
73 | 73 | * FileSystem operation executors used by {@link HttpFSServer}. |
74 | 74 | */ |
75 | 75 | @InterfaceAudience.Private |
76 | | -public class FSOperations { |
| 76 | +public final class FSOperations { |
| 77 | + |
| 78 | + private static int bufferSize = 4096; |
| 79 | + |
| 80 | + private FSOperations() { |
| 81 | + // not called |
| 82 | + } |
| 83 | + /** |
| 84 | + * Set the buffer size. The size is set during the initialization of |
| 85 | + * HttpFSServerWebApp. |
| 86 | + * @param conf the configuration to get the bufferSize |
| 87 | + */ |
| 88 | + public static void setBufferSize(Configuration conf) { |
| 89 | + bufferSize = conf.getInt(HTTPFS_BUFFER_SIZE_KEY, |
| 90 | + HTTP_BUFFER_SIZE_DEFAULT); |
| 91 | + } |
77 | 92 |
|
78 | 93 | /** |
79 | 94 | * @param fileStatus a FileStatus object |
@@ -436,10 +451,9 @@ public FSAppend(InputStream is, String path) { |
436 | 451 | */ |
437 | 452 | @Override |
438 | 453 | public Void execute(FileSystem fs) throws IOException { |
439 | | - int bufferSize = fs.getConf().getInt("httpfs.buffer.size", 4096); |
440 | 454 | OutputStream os = fs.append(path, bufferSize); |
441 | | - IOUtils.copyBytes(is, os, bufferSize, true); |
442 | | - os.close(); |
| 455 | + long bytes = copyBytes(is, os); |
| 456 | + HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes); |
443 | 457 | return null; |
444 | 458 | } |
445 | 459 |
|
@@ -522,6 +536,7 @@ public FSTruncate(String path, long newLength) { |
522 | 536 | @Override |
523 | 537 | public JSONObject execute(FileSystem fs) throws IOException { |
524 | 538 | boolean result = fs.truncate(path, newLength); |
| 539 | + HttpFSServerWebApp.get().getMetrics().incrOpsTruncate(); |
525 | 540 | return toJSON( |
526 | 541 | StringUtils.toLowerCase(HttpFSFileSystem.TRUNCATE_JSON), result); |
527 | 542 | } |
@@ -638,16 +653,65 @@ public Void execute(FileSystem fs) throws IOException { |
638 | 653 | fsPermission = FsCreateModes.create(fsPermission, |
639 | 654 | new FsPermission(unmaskedPermission)); |
640 | 655 | } |
641 | | - int bufferSize = fs.getConf().getInt(HTTPFS_BUFFER_SIZE_KEY, |
642 | | - HTTP_BUFFER_SIZE_DEFAULT); |
643 | 656 | OutputStream os = fs.create(path, fsPermission, override, bufferSize, replication, blockSize, null); |
644 | | - IOUtils.copyBytes(is, os, bufferSize, true); |
645 | | - os.close(); |
| 657 | + long bytes = copyBytes(is, os); |
| 658 | + HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes); |
646 | 659 | return null; |
647 | 660 | } |
648 | 661 |
|
649 | 662 | } |
650 | 663 |
|
| 664 | + /** |
| 665 | + * These copyBytes methods combines the two different flavors used originally. |
| 666 | + * One with length and another one with buffer size. |
| 667 | + * In this impl, buffer size is determined internally, which is a singleton |
| 668 | + * normally set during initialization. |
| 669 | + * @param in the inputStream |
| 670 | + * @param out the outputStream |
| 671 | + * @return the totalBytes |
| 672 | + * @throws IOException the exception to be thrown. |
| 673 | + */ |
| 674 | + public static long copyBytes(InputStream in, OutputStream out) |
| 675 | + throws IOException { |
| 676 | + return copyBytes(in, out, Long.MAX_VALUE); |
| 677 | + } |
| 678 | + |
| 679 | + public static long copyBytes(InputStream in, OutputStream out, long count) |
| 680 | + throws IOException { |
| 681 | + long totalBytes = 0; |
| 682 | + |
| 683 | + // If bufferSize is not initialized use 4k. This will not happen |
| 684 | + // if all callers check and set it. |
| 685 | + byte[] buf = new byte[bufferSize]; |
| 686 | + long bytesRemaining = count; |
| 687 | + int bytesRead; |
| 688 | + |
| 689 | + try { |
| 690 | + while (bytesRemaining > 0) { |
| 691 | + int bytesToRead = (int) |
| 692 | + (bytesRemaining < buf.length ? bytesRemaining : buf.length); |
| 693 | + |
| 694 | + bytesRead = in.read(buf, 0, bytesToRead); |
| 695 | + if (bytesRead == -1) { |
| 696 | + break; |
| 697 | + } |
| 698 | + |
| 699 | + out.write(buf, 0, bytesRead); |
| 700 | + bytesRemaining -= bytesRead; |
| 701 | + totalBytes += bytesRead; |
| 702 | + } |
| 703 | + return totalBytes; |
| 704 | + } finally { |
| 705 | + // Originally IOUtils.copyBytes() were called with close=true. So we are |
| 706 | + // implementing the same behavior here. |
| 707 | + try { |
| 708 | + in.close(); |
| 709 | + } finally { |
| 710 | + out.close(); |
| 711 | + } |
| 712 | + } |
| 713 | + } |
| 714 | + |
651 | 715 | /** |
652 | 716 | * Executor that performs a delete FileSystemAccess files system operation. |
653 | 717 | */ |
@@ -680,6 +744,7 @@ public FSDelete(String path, boolean recursive) { |
680 | 744 | @Override |
681 | 745 | public JSONObject execute(FileSystem fs) throws IOException { |
682 | 746 | boolean deleted = fs.delete(path, recursive); |
| 747 | + HttpFSServerWebApp.get().getMetrics().incrOpsDelete(); |
683 | 748 | return toJSON( |
684 | 749 | StringUtils.toLowerCase(HttpFSFileSystem.DELETE_JSON), deleted); |
685 | 750 | } |
@@ -748,6 +813,7 @@ public FSFileStatus(String path) { |
748 | 813 | @Override |
749 | 814 | public Map execute(FileSystem fs) throws IOException { |
750 | 815 | FileStatus status = fs.getFileStatus(path); |
| 816 | + HttpFSServerWebApp.get().getMetrics().incrOpsStat(); |
751 | 817 | return toJson(status); |
752 | 818 | } |
753 | 819 |
|
@@ -776,7 +842,6 @@ public JSONObject execute(FileSystem fs) throws IOException { |
776 | 842 | json.put(HttpFSFileSystem.HOME_DIR_JSON, homeDir.toUri().getPath()); |
777 | 843 | return json; |
778 | 844 | } |
779 | | - |
780 | 845 | } |
781 | 846 |
|
782 | 847 | /** |
@@ -814,6 +879,7 @@ public FSListStatus(String path, String filter) throws IOException { |
814 | 879 | @Override |
815 | 880 | public Map execute(FileSystem fs) throws IOException { |
816 | 881 | FileStatus[] fileStatuses = fs.listStatus(path, filter); |
| 882 | + HttpFSServerWebApp.get().getMetrics().incrOpsListing(); |
817 | 883 | return toJson(fileStatuses, fs.getFileStatus(path).isFile()); |
818 | 884 | } |
819 | 885 |
|
@@ -905,6 +971,7 @@ public JSONObject execute(FileSystem fs) throws IOException { |
905 | 971 | new FsPermission(unmaskedPermission)); |
906 | 972 | } |
907 | 973 | boolean mkdirs = fs.mkdirs(path, fsPermission); |
| 974 | + HttpFSServerWebApp.get().getMetrics().incrOpsMkdir(); |
908 | 975 | return toJSON(HttpFSFileSystem.MKDIRS_JSON, mkdirs); |
909 | 976 | } |
910 | 977 |
|
@@ -937,8 +1004,8 @@ public FSOpen(String path) { |
937 | 1004 | */ |
938 | 1005 | @Override |
939 | 1006 | public InputStream execute(FileSystem fs) throws IOException { |
940 | | - int bufferSize = HttpFSServerWebApp.get().getConfig().getInt( |
941 | | - HTTPFS_BUFFER_SIZE_KEY, HTTP_BUFFER_SIZE_DEFAULT); |
| 1007 | + // Only updating ops count. bytesRead is updated in InputStreamEntity |
| 1008 | + HttpFSServerWebApp.get().getMetrics().incrOpsOpen(); |
942 | 1009 | return fs.open(path, bufferSize); |
943 | 1010 | } |
944 | 1011 |
|
@@ -976,6 +1043,7 @@ public FSRename(String path, String toPath) { |
976 | 1043 | @Override |
977 | 1044 | public JSONObject execute(FileSystem fs) throws IOException { |
978 | 1045 | boolean renamed = fs.rename(path, toPath); |
| 1046 | + HttpFSServerWebApp.get().getMetrics().incrOpsRename(); |
979 | 1047 | return toJSON(HttpFSFileSystem.RENAME_JSON, renamed); |
980 | 1048 | } |
981 | 1049 |
|
@@ -1944,6 +2012,7 @@ public Void execute(FileSystem fs) throws IOException { |
1944 | 2012 | if (fs instanceof DistributedFileSystem) { |
1945 | 2013 | DistributedFileSystem dfs = (DistributedFileSystem) fs; |
1946 | 2014 | dfs.access(path, mode); |
| 2015 | + HttpFSServerWebApp.get().getMetrics().incrOpsCheckAccess(); |
1947 | 2016 | } else { |
1948 | 2017 | throw new UnsupportedOperationException("checkaccess is " |
1949 | 2018 | + "not supported for HttpFs on " + fs.getClass() |
|
0 commit comments