Skip to content

Commit d104013

Browse files
sboryanavina
authored andcommitted
SAMZA-1107:Job model publish
add utils for publishing job model and job model version to ZK. Author: Boris Shkolnik <[email protected]> Author: Boris Shkolnik <[email protected]> Author: navina <[email protected]> Reviewers: Navina Ramesh <[email protected]>, Fred Ji <[email protected]> Closes apache#67 from sborya/JobModelPublish1
1 parent 4d7b3b3 commit d104013

File tree

3 files changed

+128
-5
lines changed

3 files changed

+128
-5
lines changed

samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.samza.zk;
2121

22+
import java.io.IOException;
2223
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.concurrent.TimeUnit;
@@ -27,6 +28,11 @@
2728
import org.I0Itec.zkclient.ZkClient;
2829
import org.I0Itec.zkclient.ZkConnection;
2930
import org.I0Itec.zkclient.exception.ZkInterruptedException;
31+
import org.apache.samza.SamzaException;
32+
import org.apache.samza.job.model.JobModel;
33+
import org.apache.samza.serializers.model.SamzaObjectMapper;
34+
import org.apache.zookeeper.data.Stat;
35+
import org.codehaus.jackson.map.ObjectMapper;
3036
import org.slf4j.Logger;
3137
import org.slf4j.LoggerFactory;
3238

@@ -154,6 +160,44 @@ public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
154160
zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
155161
}
156162

163+
/**
164+
* Publishes new job model into ZK.
165+
* This call should FAIL if the node already exists.
166+
* @param jobModelVersion version of the jobModeL to publish
167+
* @param jobModel jobModel to publish
168+
*
169+
*/
170+
public void publishJobModel(String jobModelVersion, JobModel jobModel) {
171+
try {
172+
ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
173+
String jobModelStr = mmapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobModel);
174+
LOG.info("pid=" + processorId + " jobModelAsString=" + jobModelStr);
175+
zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), jobModelStr);
176+
LOG.info("wrote jobModel path =" + keyBuilder.getJobModelPath(jobModelVersion));
177+
} catch (Exception e) {
178+
LOG.error("JobModel publish failed for version=" + jobModelVersion, e);
179+
throw new SamzaException(e);
180+
}
181+
}
182+
183+
/**
184+
* get the job model from ZK by version
185+
* @param jobModelVersion jobModel version to get
186+
* @return job model for this version
187+
*/
188+
public JobModel getJobModel(String jobModelVersion) {
189+
LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
190+
Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
191+
ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
192+
JobModel jm;
193+
try {
194+
jm = mmapper.readValue((String) data, JobModel.class);
195+
} catch (IOException e) {
196+
throw new SamzaException("failed to read JobModel from ZK", e);
197+
}
198+
return jm;
199+
}
200+
157201
/**
158202
* read the jobmodel version from ZK
159203
* @return jobmodel version as a string
@@ -162,6 +206,36 @@ public String getJobModelVersion() {
162206
return zkClient.<String>readData(keyBuilder.getJobModelVersionPath());
163207
}
164208

209+
/**
210+
* publish the version number of the next JobModel
211+
* @param oldVersion - used to validate, that no one has changed the version in the meanwhile.
212+
* @param newVersion - new version.
213+
*/
214+
public void publishJobModelVersion(String oldVersion, String newVersion) {
215+
Stat stat = new Stat();
216+
String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
217+
LOG.info("pid=" + processorId + " publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
218+
.getVersion() + ")");
219+
220+
if (currentVersion != null && !currentVersion.equals(oldVersion)) {
221+
throw new SamzaException(
222+
"Someone change JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion);
223+
}
224+
// data version is the ZK version of the data from the ZK.
225+
int dataVersion = stat.getVersion();
226+
try {
227+
stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion);
228+
} catch (Exception e) {
229+
String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion;
230+
LOG.error(msg, e);
231+
throw new SamzaException(msg);
232+
}
233+
LOG.info("pid=" + processorId +
234+
" published new version: " + newVersion + "; expected data version = " + (dataVersion + 1) + "(actual data version after update = " + stat.getVersion()
235+
+ ")");
236+
}
237+
238+
165239
/**
166240
* verify that given paths exist in ZK
167241
* @param paths - paths to verify or create
@@ -190,5 +264,4 @@ public void deleteRoot() {
190264
zkClient.deleteRecursive(rootPath);
191265
}
192266
}
193-
194267
}

samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,16 @@ public void testParseIdFromPath() {
5050
Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null));
5151
Assert.assertNull(ZkKeyBuilder.parseIdFromPath(""));
5252
}
53+
54+
@Test
55+
public void testJobModelPath() {
56+
57+
ZkKeyBuilder builder = new ZkKeyBuilder("test");
58+
59+
Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_VERSION_PATH, builder.getJobModelVersionPath());
60+
Assert.assertEquals("/test/jobModels", builder.getJobModelPathPrefix());
61+
String version = "2";
62+
Assert.assertEquals("/test/jobModels/" + version, builder.getJobModelPath(version));
63+
Assert.assertEquals("/test/versionBarriers", builder.getJobModelVersionBarrierPrefix());
64+
}
5365
}

samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@
1818
*/
1919
package org.apache.samza.zk;
2020

21+
import java.util.HashMap;
22+
import java.util.Map;
2123
import java.util.function.BooleanSupplier;
2224
import org.I0Itec.zkclient.IZkDataListener;
2325
import org.I0Itec.zkclient.ZkClient;
2426
import org.I0Itec.zkclient.ZkConnection;
2527
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
28+
import org.apache.samza.SamzaException;
29+
import org.apache.samza.config.MapConfig;
30+
import org.apache.samza.job.model.ContainerModel;
31+
import org.apache.samza.job.model.JobModel;
2632
import org.apache.samza.testUtils.EmbeddedZookeeper;
2733
import org.junit.After;
2834
import org.junit.AfterClass;
@@ -60,7 +66,6 @@ public void testSetup() {
6066
// Do nothing
6167
}
6268

63-
6469
zkUtils = new ZkUtils(
6570
KEY_BUILDER,
6671
zkClient,
@@ -96,11 +101,9 @@ public void testRegisterProcessorId() {
96101
public void testGetActiveProcessors() {
97102
Assert.assertEquals(0, zkUtils.getSortedActiveProcessors().size());
98103
zkUtils.registerProcessorAndGetId("processorData");
99-
100104
Assert.assertEquals(1, zkUtils.getSortedActiveProcessors().size());
101-
102105
}
103-
106+
104107
@Test
105108
public void testSubscribeToJobModelVersionChange() {
106109

@@ -157,6 +160,41 @@ public void handleDataDeleted(String dataPath)
157160
Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000));
158161
}
159162

163+
@Test
164+
public void testPublishNewJobModel() {
165+
ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
166+
String root = keyBuilder.getRootPath();
167+
zkClient.deleteRecursive(root);
168+
String version = "1";
169+
String oldVersion = "0";
170+
171+
zkUtils.makeSurePersistentPathsExists(
172+
new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
173+
174+
zkUtils.publishJobModelVersion(oldVersion, version);
175+
Assert.assertEquals(version, zkUtils.getJobModelVersion());
176+
177+
String newerVersion = Long.toString(Long.valueOf(version) + 1);
178+
zkUtils.publishJobModelVersion(version, newerVersion);
179+
Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion());
180+
181+
try {
182+
zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version
183+
Assert.fail("publish invalid version should've failed");
184+
} catch (SamzaException e) {
185+
// expected
186+
}
187+
188+
// create job model
189+
Map<String, String> configMap = new HashMap<>();
190+
Map<Integer, ContainerModel> containers = new HashMap<>();
191+
MapConfig config = new MapConfig(configMap);
192+
JobModel jobModel = new JobModel(config, containers);
193+
194+
zkUtils.publishJobModel(version, jobModel);
195+
Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
196+
}
197+
160198
public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
161199
long delay = startDelayMs;
162200
while (delay < maxDelayMs) {

0 commit comments

Comments
 (0)