Skip to content

Commit ce23fc9

Browse files
committed
chore: error log on stream fail, retry cancels, 5000ms default deadline
Signed-off-by: Todd Baert <[email protected]>
1 parent ac8a4db commit ce23fc9

File tree

5 files changed

+11
-8
lines changed

5 files changed

+11
-8
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResources.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public void waitForInitialization(long deadline) {
6161
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
6262
if (now >= end) {
6363
throw new GeneralError(String.format(
64-
"Deadline exceeded. Condition did not complete within the %d ms deadline", deadline));
64+
"Startup timeout exceeded. Initialization did not complete within the %d ms deadline",
65+
deadline));
6566
}
6667
long remaining = end - now;
6768
synchronized (this) {

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@ public class ChannelBuilder {
5858
"retryableStatusCodes",
5959
Arrays.asList(
6060
/*
61-
* All codes are retryable except OK, CANCELLED and DEADLINE_EXCEEDED since
61+
* All codes are retryable except OK and DEADLINE_EXCEEDED since
6262
* any others not listed here cause a very tight loop of retries.
63-
* CANCELLED is not retryable because it is a client-side termination.
6463
* DEADLINE_EXCEEDED is typically a result of a client specified deadline,
6564
* and definitionally should not result in a tight loop (it's a timeout).
6665
*/
66+
Code.CANCELLED.toString(),
6767
Code.UNKNOWN.toString(),
6868
Code.INVALID_ARGUMENT.toString(),
6969
Code.NOT_FOUND.toString(),

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/QueueingStreamObserver.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class QueueingStreamObserver<T> implements StreamObserver<T> {
1717

1818
public QueueingStreamObserver(final BlockingQueue<StreamResponseModel<T>> queue) {
1919
blockingQueue = queue;
20+
queue.clear();
2021
}
2122

2223
@Override

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private void observeSyncStream() throws InterruptedException {
128128
// create a context which exists to track and cancel the stream
129129
try (CancellableContext context = Context.current().withCancellation()) {
130130

131-
restart(); // start the stream within the context
131+
restart(); // start the stream with the context
132132

133133
// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
134134
if (!syncMetadataDisabled) {
@@ -150,16 +150,16 @@ private void observeSyncStream() throws InterruptedException {
150150
while (!shutdown.get() && !Context.current().isCancelled()) {
151151
final StreamResponseModel<SyncFlagsResponse> taken = incomingQueue.take();
152152
if (taken.isComplete()) {
153-
log.debug("Sync stream completed, will reconnect");
153+
log.debug("Sync stream completed, will restart");
154154
// The stream is complete, we still try to reconnect
155155
break;
156156
}
157157

158158
Throwable streamException = taken.getError();
159159
if (streamException != null) {
160-
log.debug("Exception in GRPC connection, streamException {}, will reconnect", streamException);
161-
if (!outgoingQueue.offer(new QueuePayload(
162-
QueuePayloadType.ERROR, "Error from stream or metadata", metadataResponse))) {
160+
log.error("Exception in stream RPC, streamException {}, will restart", streamException);
161+
if (!outgoingQueue.offer(
162+
new QueuePayload(QueuePayloadType.ERROR, "Error from stream: ", metadataResponse))) {
163163
log.error("Failed to convey ERROR status, queue is full");
164164
}
165165
break;

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class SyncStreamQueueSourceTest {
4040
@BeforeEach
4141
public void init() throws Exception {
4242
blockingStub = mock(FlagSyncServiceBlockingStub.class);
43+
when(blockingStub.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStub);
4344
when(blockingStub.getMetadata(any())).thenReturn(GetMetadataResponse.getDefaultInstance());
4445

4546
mockConnector = mock(ChannelConnector.class);

0 commit comments

Comments
 (0)