Skip to content

Commit da13ab1

Browse files
committed
Allow problems archiving to not block
Adds try catch (log) around calculating next action in AA Processor Adds try catch (log) around submitting an action in AA Processor Do no action if archiver is empty
1 parent 8923bb9 commit da13ab1

File tree

3 files changed

+58
-34
lines changed

3 files changed

+58
-34
lines changed

src/main/java/org/phoebus/channelfinder/processors/AAChannelProcessor.java

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,15 @@ public String processorInfo() {
8787
* If the pvStatus auto pause is set, then the pv will be auto pause resumed as well.
8888
*
8989
* @param channels List of channels
90+
* @return Return number of channels processed
9091
* @throws JsonProcessingException If processing archiver responses fail.
9192
*/
9293
@Override
93-
public void process(List<Channel> channels) throws JsonProcessingException {
94+
public long process(List<Channel> channels) throws JsonProcessingException {
9495
if (channels.isEmpty()) {
95-
return;
96+
return 0;
9697
}
98+
9799
Map<String, List<ArchivePV>> aaArchivePVS = new HashMap<>(); // AA identifier, ArchivePV
98100
for (String alias : aaURLs.keySet()) {
99101
aaArchivePVS.put(alias, new ArrayList<>());
@@ -106,36 +108,49 @@ public void process(List<Channel> channels) throws JsonProcessingException {
106108
.filter(xmlProperty -> archivePropertyName.equalsIgnoreCase(xmlProperty.getName()))
107109
.findFirst();
108110
if (archiveProperty.isPresent()) {
109-
String pvStatus = channel.getProperties().stream()
110-
.filter(xmlProperty -> PV_STATUS_PROPERTY_NAME.equalsIgnoreCase(xmlProperty.getName()))
111-
.findFirst()
112-
.map(Property::getValue)
113-
.orElse(PV_STATUS_INACTIVE);
114-
String archiverAlias = channel.getProperties().stream()
115-
.filter(xmlProperty -> archiverPropertyName.equalsIgnoreCase(xmlProperty.getName()))
116-
.findFirst()
117-
.map(Property::getValue)
118-
.orElse(defaultArchiver);
119-
ArchivePV newArchiverPV = createArchivePV(
120-
policyLists.get(archiverAlias),
121-
channel,
122-
archiveProperty.get().getValue(),
123-
autoPauseOptions.contains(PV_STATUS_PROPERTY_NAME) ? pvStatus : PV_STATUS_ACTIVE);
124-
aaArchivePVS.get(archiverAlias).add(newArchiverPV);
111+
try {
112+
addChannelChange(channel, aaArchivePVS, policyLists, archiveProperty);
113+
} catch (Exception e) {
114+
logger.log(Level.WARNING, String.format("Failed to process %s", channel), e);
115+
}
125116
} else if (autoPauseOptions.contains(archivePropertyName)) {
126117
aaURLs.keySet().forEach(archiverAlias -> aaArchivePVS
127118
.get(archiverAlias)
128119
.add(createArchivePV(List.of(), channel, "", PV_STATUS_INACTIVE)));
129120
}
130121
});
131-
122+
long count = 0;
132123
for (Map.Entry<String, List<ArchivePV>> e : aaArchivePVS.entrySet()) {
133124
String archiverURL = aaURLs.get(e.getKey());
134125
Map<String, ArchivePV> archivePVSList =
135126
e.getValue().stream().collect(Collectors.toMap(archivePV -> archivePV.pv, archivePV -> archivePV));
136127
Map<ArchiveAction, List<ArchivePV>> archiveActionArchivePVMap =
137128
getArchiveActions(archivePVSList, archiverURL);
138-
configureAA(archiveActionArchivePVMap, archiverURL);
129+
count += configureAA(archiveActionArchivePVMap, archiverURL);
130+
}
131+
long finalCount = count;
132+
logger.log(Level.INFO, () -> String.format("Configured %s channels.", finalCount));
133+
return finalCount;
134+
}
135+
136+
private void addChannelChange(Channel channel, Map<String, List<ArchivePV>> aaArchivePVS, Map<String, List<String>> policyLists, Optional<Property> archiveProperty) {
137+
String pvStatus = channel.getProperties().stream()
138+
.filter(xmlProperty -> PV_STATUS_PROPERTY_NAME.equalsIgnoreCase(xmlProperty.getName()))
139+
.findFirst()
140+
.map(Property::getValue)
141+
.orElse(PV_STATUS_INACTIVE);
142+
String archiverAlias = channel.getProperties().stream()
143+
.filter(xmlProperty -> archiverPropertyName.equalsIgnoreCase(xmlProperty.getName()))
144+
.findFirst()
145+
.map(Property::getValue)
146+
.orElse(defaultArchiver);
147+
if (aaArchivePVS.containsKey(archiverAlias) && archiveProperty.isPresent()) {
148+
ArchivePV newArchiverPV = createArchivePV(
149+
policyLists.get(archiverAlias),
150+
channel,
151+
archiveProperty.get().getValue(),
152+
autoPauseOptions.contains(PV_STATUS_PROPERTY_NAME) ? pvStatus : PV_STATUS_ACTIVE);
153+
aaArchivePVS.get(archiverAlias).add(newArchiverPV);
139154
}
140155
}
141156

@@ -217,13 +232,13 @@ private ArchivePV createArchivePV(
217232
return newArchiverPV;
218233
}
219234

220-
private void configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String aaURL)
235+
private long configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String aaURL)
221236
throws JsonProcessingException {
222237
logger.log(Level.INFO, () -> String.format("Configure PVs %s in %s", archivePVS.toString(), aaURL));
223-
238+
long count = 0;
224239
// Don't request to archive an empty list.
225240
if (archivePVS.isEmpty()) {
226-
return;
241+
return count;
227242
}
228243
if (!archivePVS.get(ArchiveAction.ARCHIVE).isEmpty()) {
229244
logger.log(
@@ -234,6 +249,7 @@ private void configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String
234249
objectMapper.writeValueAsString(archivePVS.get(ArchiveAction.ARCHIVE)),
235250
ArchiveAction.ARCHIVE.endpoint,
236251
aaURL);
252+
count += archivePVS.get(ArchiveAction.ARCHIVE).size();
237253
}
238254
if (!archivePVS.get(ArchiveAction.PAUSE).isEmpty()) {
239255
logger.log(
@@ -246,6 +262,7 @@ private void configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String
246262
.collect(Collectors.toList())),
247263
ArchiveAction.PAUSE.endpoint,
248264
aaURL);
265+
count += archivePVS.get(ArchiveAction.PAUSE).size();
249266
}
250267
if (!archivePVS.get(ArchiveAction.RESUME).isEmpty()) {
251268
logger.log(
@@ -258,20 +275,26 @@ private void configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String
258275
.collect(Collectors.toList())),
259276
ArchiveAction.RESUME.endpoint,
260277
aaURL);
278+
count += archivePVS.get(ArchiveAction.RESUME).size();
261279
}
280+
return count;
262281
}
263282

264283
private void submitAction(String values, String endpoint, String aaURL) {
284+
try {
285+
String response = client.post()
286+
.uri(URI.create(aaURL + MGMT_RESOURCE + endpoint))
287+
.contentType(MediaType.APPLICATION_JSON)
288+
.bodyValue(values)
289+
.retrieve()
290+
.bodyToMono(String.class)
291+
.timeout(Duration.of(10, ChronoUnit.SECONDS))
292+
.block();
293+
logger.log(Level.FINE, () -> response);
265294

266-
String response = client.post()
267-
.uri(URI.create(aaURL + MGMT_RESOURCE + endpoint))
268-
.contentType(MediaType.APPLICATION_JSON)
269-
.bodyValue(values)
270-
.retrieve()
271-
.bodyToMono(String.class)
272-
.timeout(Duration.of(10, ChronoUnit.SECONDS))
273-
.block();
274-
logger.log(Level.FINE, () -> response);
295+
} catch (Exception e) {
296+
logger.log(Level.WARNING, String.format("Failed to submit %s to %s on %s", values, endpoint, aaURL), e);
297+
}
275298
}
276299

277300
private Map<String, List<String>> getAAsPolicies(Map<String, String> aaURLs) {

src/main/java/org/phoebus/channelfinder/processors/ChannelProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ public interface ChannelProcessor {
1111

1212
String processorInfo();
1313

14-
void process(List<Channel> channels) throws JsonProcessingException;
14+
long process(List<Channel> channels) throws JsonProcessingException;
1515

1616
}

src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ public static void paramableAAChannelProcessorTest(
114114
.addHeader("Content-Type", "application/json"));
115115
}
116116

117-
aaChannelProcessor.process(List.of(channel));
117+
long count = aaChannelProcessor.process(List.of(channel));
118+
assertEquals(count, archiverEndpoint.isEmpty() ? 0 : 1);
118119

119120
int expectedRequests = 1;
120121
RecordedRequest requestPolicy = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)