Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@ public String processorInfo() {
* If the pvStatus auto pause is set, then the pv will be auto pause resumed as well.
*
* @param channels List of channels
* @return Return number of channels processed
* @throws JsonProcessingException If processing archiver responses fail.
*/
@Override
public void process(List<Channel> channels) throws JsonProcessingException {
public long process(List<Channel> channels) throws JsonProcessingException {
if (channels.isEmpty()) {
return;
return 0;
}

Map<String, List<ArchivePV>> aaArchivePVS = new HashMap<>(); // AA identifier, ArchivePV
for (String alias : aaURLs.keySet()) {
aaArchivePVS.put(alias, new ArrayList<>());
Expand All @@ -106,36 +108,49 @@ public void process(List<Channel> channels) throws JsonProcessingException {
.filter(xmlProperty -> archivePropertyName.equalsIgnoreCase(xmlProperty.getName()))
.findFirst();
if (archiveProperty.isPresent()) {
String pvStatus = channel.getProperties().stream()
.filter(xmlProperty -> PV_STATUS_PROPERTY_NAME.equalsIgnoreCase(xmlProperty.getName()))
.findFirst()
.map(Property::getValue)
.orElse(PV_STATUS_INACTIVE);
String archiverAlias = channel.getProperties().stream()
.filter(xmlProperty -> archiverPropertyName.equalsIgnoreCase(xmlProperty.getName()))
.findFirst()
.map(Property::getValue)
.orElse(defaultArchiver);
ArchivePV newArchiverPV = createArchivePV(
policyLists.get(archiverAlias),
channel,
archiveProperty.get().getValue(),
autoPauseOptions.contains(PV_STATUS_PROPERTY_NAME) ? pvStatus : PV_STATUS_ACTIVE);
aaArchivePVS.get(archiverAlias).add(newArchiverPV);
try {
addChannelChange(channel, aaArchivePVS, policyLists, archiveProperty);
} catch (Exception e) {
logger.log(Level.WARNING, String.format("Failed to process %s", channel), e);
}
} else if (autoPauseOptions.contains(archivePropertyName)) {
aaURLs.keySet().forEach(archiverAlias -> aaArchivePVS
.get(archiverAlias)
.add(createArchivePV(List.of(), channel, "", PV_STATUS_INACTIVE)));
}
});

long count = 0;
for (Map.Entry<String, List<ArchivePV>> e : aaArchivePVS.entrySet()) {
String archiverURL = aaURLs.get(e.getKey());
Map<String, ArchivePV> archivePVSList =
e.getValue().stream().collect(Collectors.toMap(archivePV -> archivePV.pv, archivePV -> archivePV));
Map<ArchiveAction, List<ArchivePV>> archiveActionArchivePVMap =
getArchiveActions(archivePVSList, archiverURL);
configureAA(archiveActionArchivePVMap, archiverURL);
count += configureAA(archiveActionArchivePVMap, archiverURL);
}
long finalCount = count;
logger.log(Level.INFO, () -> String.format("Configured %s channels.", finalCount));
return finalCount;
}

private void addChannelChange(Channel channel, Map<String, List<ArchivePV>> aaArchivePVS, Map<String, List<String>> policyLists, Optional<Property> archiveProperty) {
String pvStatus = channel.getProperties().stream()
.filter(xmlProperty -> PV_STATUS_PROPERTY_NAME.equalsIgnoreCase(xmlProperty.getName()))
.findFirst()
.map(Property::getValue)
.orElse(PV_STATUS_INACTIVE);
String archiverAlias = channel.getProperties().stream()
.filter(xmlProperty -> archiverPropertyName.equalsIgnoreCase(xmlProperty.getName()))
.findFirst()
.map(Property::getValue)
.orElse(defaultArchiver);
if (aaArchivePVS.containsKey(archiverAlias) && archiveProperty.isPresent()) {
ArchivePV newArchiverPV = createArchivePV(
policyLists.get(archiverAlias),
channel,
archiveProperty.get().getValue(),
autoPauseOptions.contains(PV_STATUS_PROPERTY_NAME) ? pvStatus : PV_STATUS_ACTIVE);
aaArchivePVS.get(archiverAlias).add(newArchiverPV);
}
}

Expand Down Expand Up @@ -217,13 +232,13 @@ private ArchivePV createArchivePV(
return newArchiverPV;
}

private void configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String aaURL)
private long configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String aaURL)
throws JsonProcessingException {
logger.log(Level.INFO, () -> String.format("Configure PVs %s in %s", archivePVS.toString(), aaURL));

long count = 0;
// Don't request to archive an empty list.
if (archivePVS.isEmpty()) {
return;
return count;
}
if (!archivePVS.get(ArchiveAction.ARCHIVE).isEmpty()) {
logger.log(
Expand All @@ -234,6 +249,7 @@ private void configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String
objectMapper.writeValueAsString(archivePVS.get(ArchiveAction.ARCHIVE)),
ArchiveAction.ARCHIVE.endpoint,
aaURL);
count += archivePVS.get(ArchiveAction.ARCHIVE).size();
}
if (!archivePVS.get(ArchiveAction.PAUSE).isEmpty()) {
logger.log(
Expand All @@ -246,6 +262,7 @@ private void configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String
.collect(Collectors.toList())),
ArchiveAction.PAUSE.endpoint,
aaURL);
count += archivePVS.get(ArchiveAction.PAUSE).size();
}
if (!archivePVS.get(ArchiveAction.RESUME).isEmpty()) {
logger.log(
Expand All @@ -258,20 +275,26 @@ private void configureAA(Map<ArchiveAction, List<ArchivePV>> archivePVS, String
.collect(Collectors.toList())),
ArchiveAction.RESUME.endpoint,
aaURL);
count += archivePVS.get(ArchiveAction.RESUME).size();
}
return count;
}

private void submitAction(String values, String endpoint, String aaURL) {
try {
String response = client.post()
.uri(URI.create(aaURL + MGMT_RESOURCE + endpoint))
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(values)
.retrieve()
.bodyToMono(String.class)
.timeout(Duration.of(10, ChronoUnit.SECONDS))
.block();
logger.log(Level.FINE, () -> response);

String response = client.post()
.uri(URI.create(aaURL + MGMT_RESOURCE + endpoint))
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(values)
.retrieve()
.bodyToMono(String.class)
.timeout(Duration.of(10, ChronoUnit.SECONDS))
.block();
logger.log(Level.FINE, () -> response);
} catch (Exception e) {
logger.log(Level.WARNING, String.format("Failed to submit %s to %s on %s", values, endpoint, aaURL), e);
}
}

private Map<String, List<String>> getAAsPolicies(Map<String, String> aaURLs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ public interface ChannelProcessor {

String processorInfo();

void process(List<Channel> channels) throws JsonProcessingException;
long process(List<Channel> channels) throws JsonProcessingException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public static void paramableAAChannelProcessorTest(
.addHeader("Content-Type", "application/json"));
}

aaChannelProcessor.process(List.of(channel));
long count = aaChannelProcessor.process(List.of(channel));
assertEquals(count, archiverEndpoint.isEmpty() ? 0 : 1);

int expectedRequests = 1;
RecordedRequest requestPolicy = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS);
Expand Down