Skip to content

Commit c7e5dcb

Browse files
committed
SAMZA-2026: Refactor remote table API to separate retry policy settings
As per subject, the goal is to make configuration of retry policies consistent with other API's. Author: Wei Song <[email protected]> Reviewers: Aditya Toomula <[email protected]> Closes apache#842 from weisong44/SAMZA-2026
1 parent cc314be commit c7e5dcb

File tree

2 files changed

+12
-17
lines changed

2 files changed

+12
-17
lines changed

samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -125,29 +125,25 @@ public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> wr
125125
}
126126

127127
/**
128-
* Use specified TableReadFunction with remote table.
129-
* @param readFn read function instance
130-
* @param retryPolicy retry policy for the read function
128+
* Use specified {@link TableRetryPolicy} with the {@link TableReadFunction}.
129+
* @param retryPolicy retry policy for the write function
131130
* @return this table descriptor instance
132131
*/
133-
public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) {
132+
public RemoteTableDescriptor<K, V> withReadRetryPolicy(TableRetryPolicy retryPolicy) {
134133
Preconditions.checkNotNull(readFn, "null read function");
135134
Preconditions.checkNotNull(retryPolicy, "null retry policy");
136-
this.readFn = readFn;
137135
this.readRetryPolicy = retryPolicy;
138136
return this;
139137
}
140138

141139
/**
142-
* Use specified TableWriteFunction with remote table.
143-
* @param writeFn write function instance
140+
* Use specified {@link TableRetryPolicy} with the {@link TableWriteFunction}.
144141
* @param retryPolicy retry policy for the write function
145142
* @return this table descriptor instance
146143
*/
147-
public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) {
144+
public RemoteTableDescriptor<K, V> withWriteRetryPolicy(TableRetryPolicy retryPolicy) {
148145
Preconditions.checkNotNull(writeFn, "null write function");
149146
Preconditions.checkNotNull(retryPolicy, "null retry policy");
150-
this.writeFn = writeFn;
151147
this.writeRetryPolicy = retryPolicy;
152148
return this;
153149
}

samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,11 @@ public int getCredits(K key, V value) {
175175

176176
private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean rlGets, boolean rlPuts) {
177177
int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0);
178-
RemoteTableDescriptor<String, String> desc = new RemoteTableDescriptor("1");
179-
TableRetryPolicy retryPolicy = new TableRetryPolicy();
180-
retryPolicy.withRetryPredicate((ex) -> false);
181-
desc.withReadFunction(createMockTableReadFunction(), retryPolicy);
182-
desc.withWriteFunction(createMockTableWriteFunction());
183-
desc.withAsyncCallbackExecutorPoolSize(10);
178+
RemoteTableDescriptor<String, String> desc = new RemoteTableDescriptor("1")
179+
.withReadFunction(createMockTableReadFunction())
180+
.withReadRetryPolicy(new TableRetryPolicy().withRetryPredicate((ex) -> false))
181+
.withWriteFunction(createMockTableWriteFunction())
182+
.withAsyncCallbackExecutorPoolSize(10);
184183

185184
if (rateOnly) {
186185
if (rlGets) {
@@ -218,8 +217,8 @@ private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean r
218217
ThreadPoolExecutor callbackExecutor = (ThreadPoolExecutor) rwTable.getCallbackExecutor();
219218
Assert.assertEquals(10, callbackExecutor.getCorePoolSize());
220219

221-
Assert.assertNotNull(rwTable.getReadFn() instanceof RetriableReadFunction);
222-
Assert.assertNotNull(!(rwTable.getWriteFn() instanceof RetriableWriteFunction));
220+
Assert.assertTrue(rwTable.getReadFn() instanceof RetriableReadFunction);
221+
Assert.assertFalse(rwTable.getWriteFn() instanceof RetriableWriteFunction);
223222
}
224223

225224
@Test

0 commit comments

Comments
 (0)