Skip to content

Commit 4301c2b

Browse files
committed
SAMZA-2059: Persist configuration in coordinator stream for standalone.
Prior to Samza 1.0, users plugged in the properties of an I/O system through a configuration file. Samza employed rewriters in the user-defined order to compute the configuration of a job. Post Samza 1.0, we introduced new abstractions viz` StreamDescriptor` and `SystemDescriptor` in samza, with the purpose of performing configuration expansion for predefined systems at run-time. Configuration computed at run-time is not persisted at a centralized storage in samza-standalone. This breaks the functionality of the tools viz checkpoint-tool, coordinator-stream-writer, etc in samza standalone. This patch addresses this problem by storing the configuration in coordinator stream for standalone. In the follow up PR's: 1. We'll switch from zookeeper to coordinator-stream as JobModel storage layer in standalone 2. Samza tools(checkpoint-tool) will be migrated to read the configuration from coordinator stream rather than from disk. Author: Shanthoosh Venkataraman <[email protected]> Author: shanthoosh <[email protected]> Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes apache#879 from shanthoosh/standalone-coordinator-stream-for-config and squashes the following commits: c989a59b [shanthoosh] Merge branch 'master' into standalone-coordinator-stream-for-config d6290f9e [Shanthoosh Venkataraman] Addressing review comments. 8d0ae13d [Shanthoosh Venkataraman] Fix typo in java doc. f194a04e [Shanthoosh Venkataraman] SAMZA-2059: Storing configuration in coordinator stream for standalone.
1 parent e753e33 commit 4301c2b

File tree

4 files changed

+91
-0
lines changed

4 files changed

+91
-0
lines changed

samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
2525
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
2626
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
27+
import org.apache.samza.coordinator.stream.messages.SetConfig;
2728
import org.apache.samza.SamzaException;
2829
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
2930
import org.apache.samza.serializers.JsonSerde;
@@ -58,6 +59,9 @@ public String fromBytes(byte[] bytes) {
5859
} else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
5960
SetChangelogMapping changelogMapping = new SetChangelogMapping(message);
6061
return String.valueOf(changelogMapping.getPartition());
62+
} else if (type.equalsIgnoreCase(SetConfig.TYPE)) {
63+
SetConfig setConfig = new SetConfig(message);
64+
return setConfig.getConfigValue();
6165
} else if (type.equalsIgnoreCase(SetTaskModeMapping.TYPE)) {
6266
SetTaskModeMapping setTaskModeMapping = new SetTaskModeMapping(message);
6367
return String.valueOf(setTaskModeMapping.getTaskMode());
@@ -80,6 +84,9 @@ public byte[] toBytes(String value) {
8084
} else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
8185
SetChangelogMapping changelogMapping = new SetChangelogMapping(SOURCE, "", Integer.valueOf(value));
8286
return messageSerde.toBytes(changelogMapping.getMessageMap());
87+
} else if (type.equalsIgnoreCase(SetConfig.TYPE)) {
88+
SetConfig setConfig = new SetConfig(SOURCE, "", value);
89+
return messageSerde.toBytes(setConfig.getMessageMap());
8390
} else {
8491
throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
8592
}

samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,25 @@
4444
import org.apache.samza.coordinator.JobModelManager;
4545
import org.apache.samza.coordinator.LeaderElectorListener;
4646
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
47+
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
48+
import org.apache.samza.coordinator.stream.messages.SetConfig;
4749
import org.apache.samza.job.model.ContainerModel;
4850
import org.apache.samza.job.model.JobModel;
4951
import org.apache.samza.job.model.TaskModel;
52+
import org.apache.samza.metadatastore.MetadataStore;
53+
import org.apache.samza.metadatastore.MetadataStoreFactory;
5054
import org.apache.samza.metrics.MetricsRegistry;
5155
import org.apache.samza.runtime.LocationId;
5256
import org.apache.samza.runtime.LocationIdProvider;
5357
import org.apache.samza.runtime.LocationIdProviderFactory;
5458
import org.apache.samza.storage.ChangelogStreamManager;
5559
import org.apache.samza.system.StreamMetadataCache;
60+
import org.apache.samza.system.StreamSpec;
61+
import org.apache.samza.system.SystemAdmin;
5662
import org.apache.samza.system.SystemAdmins;
5763
import org.apache.samza.system.SystemStream;
5864
import org.apache.samza.system.SystemStreamPartition;
65+
import org.apache.samza.util.CoordinatorStreamUtil;
5966
import org.apache.samza.util.SystemClock;
6067
import org.apache.samza.util.Util;
6168
import org.apache.samza.zk.ZkUtils.ProcessorNode;
@@ -258,6 +265,7 @@ void doOnProcessorChange() {
258265

259266
// Pass in null Coordinator consumer and producer because ZK doesn't have coordinator streams.
260267
ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions);
268+
storeConfigInCoordinatorStream();
261269
hasCreatedStreams = true;
262270
}
263271

@@ -280,6 +288,47 @@ void doOnProcessorChange() {
280288
debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
281289
}
282290

291+
/**
292+
* Stores the configuration of the job in the coordinator stream.
293+
*/
294+
private void storeConfigInCoordinatorStream() {
295+
MetadataStore metadataStore = null;
296+
try {
297+
// Creates the coordinator stream if it does not exists.
298+
createCoordinatorStream();
299+
300+
MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
301+
metadataStore = metadataStoreFactory.getMetadataStore(SetConfig.TYPE, config, metrics.getMetricsRegistry());
302+
metadataStore.init();
303+
CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
304+
for (Map.Entry<String, String> entry : config.entrySet()) {
305+
byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
306+
metadataStore.put(entry.getKey(), serializedValue);
307+
}
308+
} finally {
309+
if (metadataStore != null) {
310+
LOG.info("Stopping the coordinator system producer.");
311+
metadataStore.close();
312+
}
313+
}
314+
}
315+
316+
/**
317+
* Creates a coordinator stream kafka topic.
318+
*/
319+
private void createCoordinatorStream() {
320+
SystemAdmin coordinatorSystemAdmin = null;
321+
SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
322+
coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
323+
String streamName = coordinatorSystemStream.getStream();
324+
StreamSpec coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem());
325+
if (coordinatorSystemAdmin.createStream(coordinatorSpec)) {
326+
LOG.info("Created coordinator stream: {}.", streamName);
327+
} else {
328+
LOG.info("Coordinator stream: {} already exists.", streamName);
329+
}
330+
}
331+
283332
/**
284333
* Generate new JobModel when becoming a leader or the list of processor changed.
285334
*/

samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ protected Map<String, String> createConfigs(String testSystem, String inputTopic
188188
configs.put("app.messageCount", String.valueOf(messageCount));
189189
configs.put("app.outputTopic", outputTopic);
190190
configs.put("app.outputSystem", testSystem);
191+
configs.put("job.coordinator.system", testSystem);
192+
configs.put("job.coordinator.replication.factor", "1");
191193
configs.put(ZkConfig.ZK_CONNECT, zkConnect());
192194

193195
configs.put("job.systemstreampartition.grouper.factory",

samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.common.collect.Sets;
2626
import java.util.ArrayList;
2727
import java.util.Arrays;
28+
import java.util.HashMap;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Properties;
@@ -51,15 +52,20 @@
5152
import org.apache.samza.config.ZkConfig;
5253
import org.apache.samza.SamzaException;
5354
import org.apache.samza.container.TaskName;
55+
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
5456
import org.apache.samza.job.ApplicationStatus;
5557
import org.apache.samza.job.model.JobModel;
5658
import org.apache.samza.job.model.TaskModel;
59+
import org.apache.samza.metadatastore.MetadataStore;
60+
import org.apache.samza.metadatastore.MetadataStoreFactory;
61+
import org.apache.samza.metrics.MetricsRegistryMap;
5762
import org.apache.samza.runtime.ApplicationRunner;
5863
import org.apache.samza.runtime.ApplicationRunners;
5964
import org.apache.samza.system.SystemStreamPartition;
6065
import org.apache.samza.test.StandaloneIntegrationTestHarness;
6166
import org.apache.samza.test.StandaloneTestUtils;
6267
import org.apache.samza.util.NoOpMetricsRegistry;
68+
import org.apache.samza.util.Util;
6369
import org.apache.samza.zk.ZkJobCoordinatorFactory;
6470
import org.apache.samza.zk.ZkKeyBuilder;
6571
import org.apache.samza.zk.ZkUtils;
@@ -191,6 +197,7 @@ private void publishKafkaEvents(String topic, int startIndex, int endIndex, Stri
191197

192198
private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic,
193199
String appName, String appId) {
200+
String coordinatorSystemName = "coordinatorSystem";
194201
Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
195202
.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS)
196203
.put(TaskConfig.INPUT_STREAMS(), inputTopic)
@@ -211,10 +218,13 @@ private Map<String, String> buildStreamApplicationConfigMap(String systemName, S
211218
.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
212219
.put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000")
213220
.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true")
221+
.put("job.coordinator.system", coordinatorSystemName)
222+
.put("job.coordinator.replication.factor", "1")
214223
.build();
215224
Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
216225

217226
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
227+
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(coordinatorSystemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
218228
return applicationConfig;
219229
}
220230

@@ -700,6 +710,29 @@ public void testShouldGenerateJobModelOnPartitionCountChange() throws Exception
700710

701711
// Validate that the input partition count is 100 in the new JobModel.
702712
Assert.assertEquals(100, ssps.size());
713+
714+
// Validate that configuration is stored in coordinator stream.
715+
MapConfig config = getConfigFromCoordinatorStream(applicationConfig1);
716+
717+
// Execution plan and serialized DAG of a samza job is stored in the config of coordinator stream. Thus, direct equals comparison between
718+
// the application configuration and the coordinator config will fail. Iterating through the entire configuration bag and verify that expected
719+
// configuration is present in the coordinator configuration.
720+
for (Map.Entry<String, String> entry : applicationConfig1.entrySet()) {
721+
Assert.assertTrue(config.containsKey(entry.getKey()));
722+
}
723+
}
724+
725+
private MapConfig getConfigFromCoordinatorStream(Config config) {
726+
MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
727+
MetadataStore metadataStore = metadataStoreFactory.getMetadataStore("set-config", config, new MetricsRegistryMap());
728+
metadataStore.init();
729+
Map<String, String> configMap = new HashMap<>();
730+
CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde("set-config");
731+
metadataStore.all().forEach((key, value) -> {
732+
String deserializedValue = jsonSerde.fromBytes(value);
733+
configMap.put(key, deserializedValue);
734+
});
735+
return new MapConfig(configMap);
703736
}
704737

705738
private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) {

0 commit comments

Comments
 (0)