diff --git a/src/main/java/org/phoebus/channelfinder/processors/aa/ArchiveAction.java b/src/main/java/org/phoebus/channelfinder/processors/aa/ArchiveAction.java index 827b968e..150277fa 100644 --- a/src/main/java/org/phoebus/channelfinder/processors/aa/ArchiveAction.java +++ b/src/main/java/org/phoebus/channelfinder/processors/aa/ArchiveAction.java @@ -1,6 +1,6 @@ package org.phoebus.channelfinder.processors.aa; -enum ArchiveAction { +public enum ArchiveAction { ARCHIVE("/archivePV"), PAUSE("/pauseArchivingPV"), RESUME("/resumeArchivingPV"), diff --git a/src/main/java/org/phoebus/channelfinder/processors/aa/ArchiverClient.java b/src/main/java/org/phoebus/channelfinder/processors/aa/ArchiverClient.java index 8fd2ff5b..4ba456ec 100644 --- a/src/main/java/org/phoebus/channelfinder/processors/aa/ArchiverClient.java +++ b/src/main/java/org/phoebus/channelfinder/processors/aa/ArchiverClient.java @@ -76,7 +76,7 @@ List> getStatuses(Map archivePVS, private List> getStatusesFromPvListQuery(String archiverURL, List pvs) { URI pvStatusURI = UriComponentsBuilder.fromUri(URI.create(archiverURL + PV_STATUS_RESOURCE)) - .queryParam("pv", pvs) + .queryParam("pv", String.join(",", pvs)) .build() .toUri(); diff --git a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorIT.java b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorIT.java index a6a93a9f..9f9ed0bd 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorIT.java @@ -86,7 +86,7 @@ public static void paramableAAChannelProcessorTest( MockWebServer mockArchiverAppliance, ObjectMapper objectMapper, ChannelProcessor aaChannelProcessor, - Channel channel, + List channels, String archiveStatus, String archiverEndpoint, String submissionBody) @@ -107,7 +107,7 @@ public static void paramableAAChannelProcessorTest( // Request to archiver status List> archivePVStatuses = - List.of(Map.of("pvName", channel.getName(), "status", archiveStatus)); + channels.stream().map(channel -> Map.of("pvName", channel.getName(), "status", archiveStatus)).toList(); mockArchiverAppliance.enqueue(new MockResponse() .setBody(objectMapper.writeValueAsString(archivePVStatuses)) .addHeader("Content-Type", "application/json")); @@ -115,14 +115,14 @@ public static void paramableAAChannelProcessorTest( if (!archiverEndpoint.isEmpty()) { // Request to archiver to archive List> archiverResponse = - List.of(Map.of("pvName", channel.getName(), "status", "Archive request submitted")); + channels.stream().map(channel -> Map.of("pvName", channel.getName(), "status", "Archive request submitted")).toList(); mockArchiverAppliance.enqueue(new MockResponse() .setBody(objectMapper.writeValueAsString(archiverResponse)) .addHeader("Content-Type", "application/json")); } - long count = aaChannelProcessor.process(List.of(channel)); - assertEquals(count, archiverEndpoint.isEmpty() ? 0 : 1); + long count = aaChannelProcessor.process(channels); + assertEquals(count, archiverEndpoint.isEmpty() ? 0 : channels.size()); int expectedRequests = 1; RecordedRequest requestVersion = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); @@ -182,7 +182,7 @@ void testProcessNotArchivedActive( mockArchiverAppliance, objectMapper, aaChannelProcessor, - channel, + List.of(channel), archiveStatus, archiverEndpoint, submissionBody); diff --git a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorMultiIT.java b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorMultiIT.java new file mode 100644 index 00000000..32ae1387 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorMultiIT.java @@ -0,0 +1,205 @@ +package org.phoebus.channelfinder.processors; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.processors.aa.AAChannelProcessor; +import org.phoebus.channelfinder.processors.aa.ArchiveAction; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; +import org.springframework.test.context.TestPropertySource; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.phoebus.channelfinder.processors.AAChannelProcessorIT.activeProperty; +import static org.phoebus.channelfinder.processors.AAChannelProcessorIT.archiveProperty; +import static org.phoebus.channelfinder.processors.AAChannelProcessorIT.inactiveProperty; + +@WebMvcTest(AAChannelProcessor.class) +@TestPropertySource(value = "classpath:application_test.properties") +class AAChannelProcessorMultiIT { + + public static final String BEING_ARCHIVED = "Being archived"; + public static final String PAUSED = "Paused"; + public static final String NOT_BEING_ARCHIVED = "Not being archived"; + public static final String OWNER = "owner"; + @Autowired + AAChannelProcessor aaChannelProcessor; + + MockWebServer mockArchiverAppliance; + ObjectMapper objectMapper; + + @BeforeEach + void setUp() throws IOException { + mockArchiverAppliance = new MockWebServer(); + mockArchiverAppliance.start(17665); + + objectMapper = new ObjectMapper(); + } + + @AfterEach + void teardown() throws IOException { + mockArchiverAppliance.shutdown(); + } + + static Stream provideArguments() { + List channels = List.of( + new Channel("PVArchivedActive", OWNER, List.of(archiveProperty, activeProperty), List.of()), + new Channel("PVPausedActive", OWNER, List.of(archiveProperty, activeProperty), List.of()), + new Channel("PVNoneActive0", OWNER, List.of(archiveProperty, activeProperty), List.of()), + new Channel("PVArchivedInactive", OWNER, List.of(archiveProperty, inactiveProperty), List.of()), + new Channel("PVPausedInactive", OWNER, List.of(archiveProperty, inactiveProperty), List.of()), + new Channel("PVNoneInactive", OWNER, List.of(archiveProperty, inactiveProperty), List.of()), + new Channel("PVArchivedNotag", OWNER, List.of(), List.of()), + new Channel("PVNoneActive1", OWNER, List.of(archiveProperty, activeProperty), List.of()), + new Channel("PVNoneActive2", OWNER, List.of(archiveProperty, activeProperty), List.of()) + ); + + Map namesToStatuses = Map.of( + "PVArchivedActive", BEING_ARCHIVED, + "PVPausedActive", PAUSED, + "PVNoneActive0", NOT_BEING_ARCHIVED, + "PVArchivedInactive", BEING_ARCHIVED, + "PVPausedInactive", PAUSED, + "PVNoneInactive", NOT_BEING_ARCHIVED, + "PVArchivedNotag", BEING_ARCHIVED, + "PVNoneActive1", NOT_BEING_ARCHIVED, + "PVNoneActive2", NOT_BEING_ARCHIVED + ); + Map> actionsToNames = Map.of( + ArchiveAction.RESUME, List.of("PVPausedActive"), + ArchiveAction.PAUSE, List.of("PVArchivedInactive", "PVArchivedNotag"), + ArchiveAction.ARCHIVE, List.of("PVNoneActive0", "PVNoneActive1", "PVNoneActive2") + ); + int expectedProcessedChannels = 6; + + List massPVNames = IntStream.range(1, 100).mapToObj(i -> "PV" + i).toList(); + return Stream.of( + Arguments.of(channels, namesToStatuses, actionsToNames, expectedProcessedChannels), + Arguments.of( + massPVNames.stream().map(s -> new Channel(s, OWNER, List.of(archiveProperty, activeProperty), List.of())).toList(), + massPVNames.stream().collect(Collectors.toMap(String::toString, e -> NOT_BEING_ARCHIVED)), + Map.of(ArchiveAction.ARCHIVE, massPVNames), + massPVNames.size() + )); + } + + @ParameterizedTest + @MethodSource("provideArguments") + void testProcessMulti(List channels, + Map namesToStatuses, + Map> actionsToNames, + int expectedProcessedChannels) + throws JsonProcessingException, InterruptedException { + + // Request to version + Map versions = Map.of("mgmt_version", "Archiver Appliance Version 1.1.0"); + mockArchiverAppliance.enqueue(new MockResponse() + .setBody(objectMapper.writeValueAsString(versions)) + .addHeader("Content-Type", "application/json")); + + // Request to policies + Map policyList = Map.of("policy", "description"); + mockArchiverAppliance.enqueue(new MockResponse() + .setBody(objectMapper.writeValueAsString(policyList)) + .addHeader("Content-Type", "application/json")); + + // Request to archiver status + List> archivePVStatuses = + namesToStatuses.entrySet().stream().map(entry -> Map.of("pvName", entry.getKey(), "status", entry.getValue())).toList(); + mockArchiverAppliance.enqueue(new MockResponse() + .setBody(objectMapper.writeValueAsString(archivePVStatuses)) + .addHeader("Content-Type", "application/json")); + + // Requests to archiver + actionsToNames.forEach((key, value) -> { + List> archiverResponse = + value.stream().map(channel -> Map.of("pvName", channel, "status", key + " request submitted")).toList(); + try { + mockArchiverAppliance.enqueue(new MockResponse() + .setBody(objectMapper.writeValueAsString(archiverResponse)) + .addHeader("Content-Type", "application/json")); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + + long count = aaChannelProcessor.process(channels); + assertEquals(count, expectedProcessedChannels); + + AtomicInteger expectedRequests = new AtomicInteger(1); + RecordedRequest requestVersion = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); + assert requestVersion != null; + assertEquals("/mgmt/bpl/getVersions", requestVersion.getPath()); + + expectedRequests.addAndGet(1); + RecordedRequest requestPolicy = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); + assert requestPolicy != null; + assertEquals("/mgmt/bpl/getPolicyList", requestPolicy.getPath()); + + expectedRequests.addAndGet(1); + RecordedRequest requestStatus = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); + assert requestStatus != null; + assert requestStatus.getRequestUrl() != null; + assertEquals("/mgmt/bpl/getPVStatus", requestStatus.getRequestUrl().encodedPath()); + String pvStatusRequestParameter = requestStatus.getRequestUrl().queryParameter("pv"); + namesToStatuses.keySet().forEach( + name -> { + assert pvStatusRequestParameter != null; + assertTrue(pvStatusRequestParameter.contains(name)); + } + ); + + while (mockArchiverAppliance.getRequestCount() > 0) { + RecordedRequest requestAction = null; + try { + requestAction = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (requestAction == null) { + break; + } + expectedRequests.addAndGet(1); + assert requestAction.getPath() != null; + assertTrue(requestAction.getPath().startsWith("/mgmt/bpl")); + ArchiveAction key = actionFromEndpoint(requestAction.getPath().substring("/mgmt/bpl".length())); + String body = requestAction.getBody().readUtf8(); + actionsToNames.get(key).forEach(pv -> + assertTrue(body.contains(pv)) + ); + } + + assertEquals(mockArchiverAppliance.getRequestCount(), expectedRequests.get()); + } + + + public ArchiveAction actionFromEndpoint(final String endpoint) { + for (ArchiveAction action : ArchiveAction.values()) { + if (action.getEndpoint().equals(endpoint)) { + return action; + } + } + return null; + } +} diff --git a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorNoDefaultIT.java b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorNoDefaultIT.java index efd5f36e..c5d3de56 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorNoDefaultIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorNoDefaultIT.java @@ -75,7 +75,7 @@ void testProcessNotArchivedActive( mockArchiverAppliance, objectMapper, aaChannelProcessor, - channel, + List.of(channel), archiveStatus, archiverEndpoint, submissionBody diff --git a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorNoPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorNoPauseIT.java index a550ff2e..75fe7bf1 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorNoPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorNoPauseIT.java @@ -73,7 +73,7 @@ void testProcessNotArchivedActive( mockArchiverAppliance, objectMapper, aaChannelProcessor, - channel, + List.of(channel), archiveStatus, archiverEndpoint, submissionBody); diff --git a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorStatusPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorStatusPauseIT.java index 273cb9e4..2168d1ca 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorStatusPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorStatusPauseIT.java @@ -75,7 +75,7 @@ void testProcessNotArchivedActive( mockArchiverAppliance, objectMapper, aaChannelProcessor, - channel, + List.of(channel), archiveStatus, archiverEndpoint, submissionBody); diff --git a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorTagPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorTagPauseIT.java index e92d6661..e5dc9a79 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorTagPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorTagPauseIT.java @@ -74,7 +74,7 @@ void testProcessNotArchivedActive( mockArchiverAppliance, objectMapper, aaChannelProcessor, - channel, + List.of(channel), archiveStatus, archiverEndpoint, submissionBody