Skip to content

Commit 49cad07

Browse files
committed
fix apache#50 add pull namespace logic
1 parent 5a597c8 commit 49cad07

File tree

8 files changed

+432
-13
lines changed

8 files changed

+432
-13
lines changed
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.ssm;
19+
20+
import org.apache.hadoop.hdfs.DFSClient;
21+
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
22+
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
23+
import org.apache.hadoop.ssm.sql.DBAdapter;
24+
import org.apache.hadoop.ssm.sql.FileStatusInternal;
25+
26+
import java.io.IOException;
27+
import java.util.ArrayDeque;
28+
import java.util.ArrayList;
29+
import java.util.Collections;
30+
import java.util.Timer;
31+
import java.util.TimerTask;
32+
import java.util.concurrent.LinkedBlockingDeque;
33+
34+
public class NamespaceFetcher {
35+
private static final Long DEFAULT_INTERVAL = 1000L;
36+
private final long fetchInterval;
37+
private FileStatusConsumer consumer;
38+
private FetchTask fetchTask;
39+
private Timer timer;
40+
41+
public NamespaceFetcher(DFSClient client, DBAdapter adapter) {
42+
this(client, adapter, DEFAULT_INTERVAL);
43+
}
44+
45+
public NamespaceFetcher(DFSClient client, DBAdapter adapter, long fetchInterval) {
46+
this.timer = new Timer();
47+
this.fetchTask = new FetchTask(client);
48+
this.consumer = new FileStatusConsumer(adapter, fetchTask);
49+
this.fetchInterval = fetchInterval;
50+
}
51+
52+
public void startFetch() throws IOException {
53+
this.timer.schedule(fetchTask, 0, fetchInterval);
54+
this.consumer.start();
55+
}
56+
57+
public boolean fetchFinished() {
58+
return this.fetchTask.finished();
59+
}
60+
61+
public void stop() {
62+
this.timer.cancel();
63+
this.consumer.interrupt();
64+
}
65+
66+
private static class FetchTask extends TimerTask {
67+
private final static int DEFAULT_BATCH_SIZE = 20;
68+
private final static String ROOT = "/";
69+
private final HdfsFileStatus[] EMPTY_STATUS = new HdfsFileStatus[0];
70+
private final DFSClient client;
71+
// Deque for Breadth-First-Search
72+
private ArrayDeque<String> deque;
73+
// Queue for outer-consumer to fetch file status
74+
private LinkedBlockingDeque<FileStatusInternalBatch> batches;
75+
private FileStatusInternalBatch currentBatch;
76+
77+
public FetchTask(DFSClient client) {
78+
this.deque = new ArrayDeque<>();
79+
this.batches = new LinkedBlockingDeque<>();
80+
this.currentBatch = new FileStatusInternalBatch(DEFAULT_BATCH_SIZE);
81+
this.client = client;
82+
this.deque.add(ROOT);
83+
}
84+
85+
@Override
86+
public void run() {
87+
String parent = deque.pollFirst();
88+
if (parent == null) { // BFS finished
89+
if (currentBatch.actualSize() > 0) {
90+
this.batches.add(currentBatch);
91+
this.currentBatch = new FileStatusInternalBatch(DEFAULT_BATCH_SIZE);
92+
}
93+
return;
94+
}
95+
try {
96+
HdfsFileStatus status = client.getFileInfo(parent);
97+
if (status != null && status.isDir()) {
98+
FileStatusInternal internal = new FileStatusInternal(status);
99+
internal.setPath(parent);
100+
this.addFileStatus(internal);
101+
HdfsFileStatus[] children = this.listStatus(parent);
102+
for (HdfsFileStatus child : children) {
103+
if (child.isDir()) {
104+
this.deque.add(child.getFullName(parent));
105+
} else {
106+
this.addFileStatus(new FileStatusInternal(child, parent));
107+
}
108+
}
109+
}
110+
} catch (IOException | InterruptedException e) {
111+
e.printStackTrace();
112+
}
113+
}
114+
115+
public boolean finished() {
116+
return this.deque.size() == 0;
117+
}
118+
119+
public FileStatusInternalBatch pollBatch() {
120+
return this.batches.poll();
121+
}
122+
123+
public void addFileStatus(FileStatusInternal status) throws InterruptedException {
124+
this.currentBatch.add(status);
125+
if (this.currentBatch.isFull()) {
126+
this.batches.put(currentBatch);
127+
this.currentBatch = new FileStatusInternalBatch(DEFAULT_BATCH_SIZE);
128+
}
129+
}
130+
131+
// Code copy form Hdfs.java
132+
private HdfsFileStatus[] listStatus(String src) throws IOException {
133+
DirectoryListing thisListing = client.listPaths(
134+
src, HdfsFileStatus.EMPTY_NAME);
135+
if (thisListing == null) {
136+
// the directory does not exist
137+
return EMPTY_STATUS;
138+
}
139+
HdfsFileStatus[] partialListing = thisListing.getPartialListing();
140+
if (!thisListing.hasMore()) {
141+
// got all entries of the directory
142+
return partialListing;
143+
}
144+
// The directory size is too big that it needs to fetch more
145+
// estimate the total number of entries in the directory
146+
int totalNumEntries =
147+
partialListing.length + thisListing.getRemainingEntries();
148+
ArrayList<HdfsFileStatus> listing = new ArrayList<>(totalNumEntries);
149+
Collections.addAll(listing, partialListing);
150+
151+
// now fetch more entries
152+
do {
153+
thisListing = client.listPaths(src, thisListing.getLastName());
154+
155+
if (thisListing == null) {
156+
// the directory is deleted
157+
listing.toArray(new HdfsFileStatus[listing.size()]);
158+
}
159+
160+
partialListing = thisListing.getPartialListing();
161+
Collections.addAll(listing, partialListing);
162+
} while (thisListing.hasMore());
163+
164+
return listing.toArray(new HdfsFileStatus[listing.size()]);
165+
}
166+
}
167+
168+
private static class FileStatusConsumer extends Thread {
169+
private final DBAdapter dbAdapter;
170+
private final FetchTask fetchTask;
171+
172+
protected FileStatusConsumer(DBAdapter dbAdapter, FetchTask fetchTask) {
173+
this.dbAdapter = dbAdapter;
174+
this.fetchTask = fetchTask;
175+
}
176+
177+
@Override
178+
public void run() {
179+
try {
180+
while (!Thread.currentThread().isInterrupted()) {
181+
FileStatusInternalBatch batch = fetchTask.pollBatch();
182+
if (batch != null) {
183+
FileStatusInternal[] statuses = batch.getFileStatuses();
184+
if (statuses.length == batch.actualSize()) {
185+
this.dbAdapter.insertFiles(batch.getFileStatuses());
186+
} else {
187+
FileStatusInternal[] actual = new FileStatusInternal[batch.actualSize()];
188+
System.arraycopy(statuses, 0, actual, 0, batch.actualSize());
189+
this.dbAdapter.insertFiles(actual);
190+
}
191+
} else {
192+
Thread.sleep(100);
193+
}
194+
}
195+
} catch (InterruptedException e) {
196+
e.printStackTrace();
197+
}
198+
}
199+
}
200+
201+
private static class FileStatusInternalBatch {
202+
private FileStatusInternal[] fileStatuses;
203+
private final int batchSize;
204+
private int index;
205+
206+
public FileStatusInternalBatch(int batchSize) {
207+
this.batchSize = batchSize;
208+
this.fileStatuses = new FileStatusInternal[batchSize];
209+
this.index = 0;
210+
}
211+
212+
public void add(FileStatusInternal status) {
213+
this.fileStatuses[index] = status;
214+
index += 1;
215+
}
216+
217+
public boolean isFull() {
218+
return index == batchSize;
219+
}
220+
221+
public int actualSize() {
222+
return this.index;
223+
}
224+
225+
public FileStatusInternal[] getFileStatuses() {
226+
return this.fileStatuses;
227+
}
228+
}
229+
}

hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/sql/DBAdapter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,12 @@ public synchronized void insertAccessCountData(long startTime, long endTime,
129129
*
130130
* @param files
131131
*/
132-
public synchronized void insertFiles(HdfsFileStatus[] files) {
132+
public synchronized void insertFiles(FileStatusInternal[] files) {
133133
updateCache();
134134
try {
135135
Statement s = conn.createStatement();
136136
for (int i = 0; i < files.length; i++) {
137-
String sql = "INSERT INTO 'files' VALUES('" + files[i].getLocalName() +
137+
String sql = "INSERT INTO 'files' VALUES('" + files[i].getPath() +
138138
"','" + files[i].getFileId() + "','" + files[i].getLen() + "','" +
139139
files[i].getReplication() + "','" + files[i].getBlockSize() + "','" +
140140
files[i].getModificationTime() + "','" + files[i].getAccessTime() +
@@ -152,9 +152,9 @@ public synchronized void insertFiles(HdfsFileStatus[] files) {
152152
}
153153

154154
public int booleanToInt(boolean b) {
155-
if (b == true) {
155+
if (b) {
156156
return 1;
157-
}else {
157+
} else {
158158
return 0;
159159
}
160160
}
@@ -168,8 +168,8 @@ public Integer getKey(Map<Integer, String> map, String value) {
168168
return null;
169169
}
170170
public Integer getKey(Map<Integer, ErasureCodingPolicy> map, ErasureCodingPolicy value) {
171-
for (Integer key: map.keySet()) {
172-
if (map.get(key) .equals(value)) {
171+
for (Integer key : map.keySet()) {
172+
if (map.get(key).equals(value)) {
173173
return key;
174174
}
175175
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.ssm.sql;
19+
20+
import org.apache.hadoop.fs.FileEncryptionInfo;
21+
import org.apache.hadoop.fs.permission.FsPermission;
22+
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
23+
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
24+
25+
public class FileStatusInternal extends HdfsFileStatus {
26+
private String path;
27+
28+
/**
29+
* Constructor
30+
*
31+
* @param length the number of bytes the file has
32+
* @param isdir if the path is a directory
33+
* @param block_replication the replication factor
34+
* @param blocksize the block size
35+
* @param modification_time modification time
36+
* @param access_time access time
37+
* @param permission permission
38+
* @param owner the owner of the path
39+
* @param group the group of the path
40+
* @param symlink
41+
* @param path the local name in java UTF8 encoding the same as that in-memory
42+
* @param fileId the file id
43+
* @param childrenNum
44+
* @param feInfo the file's encryption info
45+
* @param storagePolicy
46+
* @param ecPolicy
47+
*/
48+
public FileStatusInternal(long length, boolean isdir, int block_replication,
49+
long blocksize, long modification_time, long access_time,
50+
FsPermission permission, String owner, String group, byte[] symlink,
51+
byte[] path, String parent, long fileId, int childrenNum, FileEncryptionInfo feInfo,
52+
byte storagePolicy, ErasureCodingPolicy ecPolicy) {
53+
super(length, isdir, block_replication, blocksize, modification_time,
54+
access_time, permission, owner, group, symlink, path, fileId, childrenNum,
55+
feInfo, storagePolicy, ecPolicy);
56+
this.path = this.getFullName(parent);
57+
}
58+
59+
public FileStatusInternal(HdfsFileStatus status, String parent) {
60+
this(status.getLen(), status.isDir(), status.getReplication(),
61+
status.getBlockSize(), status.getModificationTime(), status.getAccessTime(),
62+
status.getPermission(), status.getOwner(), status.getGroup(), status.getSymlinkInBytes(),
63+
status.getLocalNameInBytes(), parent, status.getFileId(), status.getChildrenNum(),
64+
status.getFileEncryptionInfo(), status.getStoragePolicy(),
65+
status.getErasureCodingPolicy());
66+
}
67+
68+
public FileStatusInternal(HdfsFileStatus status) {
69+
this(status, "");
70+
}
71+
72+
public String getPath() {
73+
return this.path;
74+
}
75+
76+
public void setPath(String path) {
77+
this.path = path;
78+
}
79+
}

hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/sql/Util.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
118
package org.apache.hadoop.ssm.sql;
219

320
import java.sql.Connection;

hadoop-ssm-project/src/main/java/org/apache/hadoop/ssm/sql/tables/AccessCountTableManager.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,4 @@ private boolean spanAcrossTwoMinutes(long first, long second) {
129129
return first / Constants.ONE_MINUTE_IN_MILLIS !=
130130
second / Constants.ONE_MINUTE_IN_MILLIS;
131131
}
132-
133-
private String insertAccessCountValuesSQL() {
134-
return "";
135-
}
136132
}

hadoop-ssm-project/src/test/java/org/apache/hadoop/ssm/MoveToSSDTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
118
package org.apache.hadoop.ssm;
219

320
import org.apache.hadoop.conf.Configuration;

0 commit comments

Comments
 (0)