Skip to content

Commit a40bd43

Browse files
authored
Merge pull request #370 from rabbitmq/environment-stream-exists
Add Environment#streamExists(String)
2 parents a25a31f + b30be70 commit a40bd43

File tree

3 files changed

+50
-18
lines changed

3 files changed

+50
-18
lines changed

src/main/java/com/rabbitmq/stream/Environment.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ static EnvironmentBuilder builder() {
7272
*/
7373
StreamStats queryStreamStats(String stream);
7474

75+
/**
76+
* Return whether a stream exists or not.
77+
*
78+
* @param stream
79+
* @return true if stream exists, false if it does not exist
80+
* @throws StreamException if response code is different from {@link Constants#RESPONSE_CODE_OK}
81+
* or {@link Constants#RESPONSE_CODE_STREAM_DOES_NOT_EXIST}
82+
*/
83+
boolean streamExists(String stream);
84+
7585
/**
7686
* Create a {@link ProducerBuilder} to configure and create a {@link Producer}.
7787
*

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,11 @@
1717
import static com.rabbitmq.stream.impl.Utils.exceptionMessage;
1818
import static com.rabbitmq.stream.impl.Utils.formatConstant;
1919
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
20+
import static java.lang.String.format;
2021
import static java.util.concurrent.TimeUnit.SECONDS;
2122

22-
import com.rabbitmq.stream.Address;
23-
import com.rabbitmq.stream.AddressResolver;
24-
import com.rabbitmq.stream.BackOffDelayPolicy;
25-
import com.rabbitmq.stream.Codec;
26-
import com.rabbitmq.stream.ConsumerBuilder;
27-
import com.rabbitmq.stream.Environment;
28-
import com.rabbitmq.stream.MessageHandler;
23+
import com.rabbitmq.stream.*;
2924
import com.rabbitmq.stream.MessageHandler.Context;
30-
import com.rabbitmq.stream.NoOffsetException;
31-
import com.rabbitmq.stream.OffsetSpecification;
32-
import com.rabbitmq.stream.ProducerBuilder;
33-
import com.rabbitmq.stream.StreamCreator;
34-
import com.rabbitmq.stream.StreamException;
35-
import com.rabbitmq.stream.StreamStats;
36-
import com.rabbitmq.stream.SubscriptionListener;
3725
import com.rabbitmq.stream.compression.CompressionCodecFactory;
3826
import com.rabbitmq.stream.impl.Client.ClientParameters;
3927
import com.rabbitmq.stream.impl.Client.ShutdownListener;
@@ -515,12 +503,35 @@ public StreamStats queryStreamStats(String stream) {
515503
response.getResponseCode(),
516504
stream,
517505
() ->
518-
"Error while querying stream info: "
506+
"Error while querying stream stats: "
519507
+ formatConstant(response.getResponseCode())
520508
+ ".");
521509
}
522510
}
523511

512+
@Override
513+
public boolean streamExists(String stream) {
514+
checkNotClosed();
515+
this.maybeInitializeLocator();
516+
StreamStatsResponse response =
517+
locatorOperation(
518+
Utils.namedFunction(
519+
client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream));
520+
if (response.isOk()) {
521+
return true;
522+
} else if (response.getResponseCode() == Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST) {
523+
return false;
524+
} else {
525+
throw convertCodeToException(
526+
response.getResponseCode(),
527+
stream,
528+
() ->
529+
format(
530+
"Unexpected result when checking if stream '%s' exists: %s.",
531+
stream, formatConstant(response.getResponseCode())));
532+
}
533+
}
534+
524535
private static class DefaultStreamStats implements StreamStats {
525536

526537
private final LongSupplier firstOffsetSupplier, committedOffsetSupplier;

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -559,8 +559,8 @@ void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset(boolean lazyInit)
559559
throws Exception {
560560
try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) {
561561
StreamStats stats = env.queryStreamStats(stream);
562-
assertThatThrownBy(() -> stats.firstOffset()).isInstanceOf(NoOffsetException.class);
563-
assertThatThrownBy(() -> stats.committedChunkId()).isInstanceOf(NoOffsetException.class);
562+
assertThatThrownBy(stats::firstOffset).isInstanceOf(NoOffsetException.class);
563+
assertThatThrownBy(stats::committedChunkId).isInstanceOf(NoOffsetException.class);
564564

565565
int publishCount = 20_000;
566566
TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);
@@ -595,6 +595,16 @@ void queryStreamStatsShouldThrowExceptionWhenStreamDoesNotExist() {
595595
}
596596
}
597597

598+
@ParameterizedTest
599+
@ValueSource(booleans = {true, false})
600+
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
601+
void streamExists(boolean lazyInit) {
602+
try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) {
603+
assertThat(env.streamExists(stream)).isTrue();
604+
assertThat(env.streamExists(UUID.randomUUID().toString())).isFalse();
605+
}
606+
}
607+
598608
@Test
599609
void methodsShouldThrowExceptionWhenEnvironmentIsClosed() {
600610
Environment env = environmentBuilder.build();
@@ -605,7 +615,8 @@ void methodsShouldThrowExceptionWhenEnvironmentIsClosed() {
605615
() -> env.producerBuilder(),
606616
() -> env.consumerBuilder(),
607617
() -> env.deleteStream("does not matter"),
608-
() -> env.queryStreamStats("does not matter")
618+
() -> env.queryStreamStats("does not matter"),
619+
() -> env.streamExists("does not matter")
609620
};
610621
Arrays.stream(calls)
611622
.forEach(call -> assertThatThrownBy(call).isInstanceOf(IllegalStateException.class));

0 commit comments

Comments
 (0)