Skip to content

Commit 13f2259

Browse files
refactor(core): unify operation execution pathways and remove unused logic (#220)
* refactor: split orchestrate to enhance maintainability * fix: typing * refactor: provide functions via orchestrate The refactoring moves general/explore orchestration function to private modules for clarity. However we don't want to expose this private modules external - want to keep the access point orchestrate.py * feat(core): Convert RayTaskError to original error * docs(core): Update exceptions raised orchestrate_explore_operation * feat(core): Improve granularity of exception handling * chore(core): merge log change from main * refactor: update to new import * refactor: expose required members via orchestrate * fix: Access via public interface * refactor: orchestrate params Remove unused parameters. * refactor: remove project_context arg Not needed as its we must use the one given by discover_space.project_context * refactor(orchestration): remove queue arg from orchestrate and orchestrate_exploer_operation. it was never not None so not required to be a parameter * refactor(orchestration): rename param execid -> namespace * refactor(orchestration): remove queue arg * refactor(orchestration): Remove BaseOperationRunConfiguration Originally required as it held common function for two sub-classes. However, one of those was removed meaning we don't need a base class anymore and can simplify the code * chore(comments): delete comment * refactor(orchestrate): move acquiring ac_config From orchestrate to general_orchestration. This is because - orchestrate_explore_operation already does this so it doesn't need it from orchestrate - the information is not passed from orchestrate to orchestrate_general_operation - orchestrate_general_operation should have this info but it needs to acquire it itself in case the function is called directly (not via orchestrate) Hence, it's redundant to have this code here. The correct place its needed is in orchestrate_general_operation * refactor(orchestrate): actuator configuration validation Extract functions for getting and validating actuator configurations. Rename methods. The reason is that as-is simplified get/validate is tied to DiscoveryOperationResourceConfiguration but this constrains where it has to happen to the existence of an instance of the class. * fix: imports * fix: temporarily use private module * refactor(orchestration): add_operation_from_configuration_to_metastore now takes more basic components and is called create_operation_and_add_to_metastore * refactor(orchestration): _run_operation_harness Update parameters to take operator module/parameters/info rather than having them bundled in DiscoveryOperationConfiguration instance Update to new create_operation_and_add_to_metastore interface. * refactor(orchestration): orchestrate_explore_operation Update parameters to take operator module/parameters/info rather than a DiscoveryOperationResourceConfiguration instance. This is so the parameters are as similar as possible to orchestrate_general_operation make the paths to each more unififed. Remove namespace parameter = the orchestrate function will generate its own namespace for its operation Simplify return to OperationOutput - other instance returned are deducible from this or were input Remove orchestrate_operation_function_wrapper - no longer necessary due to change in orchestrate_explore_operation input/output * refactor(orchestration): setup operator Change parameters to take operator_module and parameters instead of requiring them to be bundled in DiscoveryOperationResourceConfiguration * fix(orchestration): Check operation_info namespace * feat(core): Add namespace field to FunctionOperation To allow communicating it to a general operation that wants to create ray actors/remotes * refactor(orchestrate): update to new interface No need to call wrapper or set namespace explicitly. * fix(orchestrate): Pass namespace to general operators Also update to new run_operation_harness * refactor(orchestrate): Update orchestrate - Don't namespace at orchestrate level, let the orchestration functions set namespace - Update to new orchestrate_*_operation interfaces - Remove orchestrate_operation_function - no longer necessary * refactor(orchestrate): Remove unused parameter * docs(operator): update to new function * docs(orchestrate): update docstrings * chore(core): apply suggestions from code review * chore(core): change field name --------- Signed-off-by: Michael Johnston <[email protected]>
1 parent ece2f7d commit 13f2259

File tree

14 files changed

+452
-391
lines changed

14 files changed

+452
-391
lines changed

orchestrator/cli/resources/operation/create.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,9 @@ def create_operation(parameters: AdoCreateCommandParameters):
8585

8686
try:
8787
operation_output = orchestrator.modules.operators.orchestrate.orchestrate(
88-
base_operation_configuration=op_resource_configuration,
88+
operation_resource_configuration=op_resource_configuration,
8989
project_context=parameters.ado_configuration.project_context,
9090
discovery_space_identifier=op_resource_configuration.spaces[0],
91-
discovery_space_configuration=None,
9291
)
9392

9493
except MeasurementError as e:

orchestrator/core/operation/config.py

Lines changed: 167 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,140 @@ class DiscoveryOperationEnum(enum.Enum):
3838
EXPORT = "export"
3939

4040

41+
def get_actuator_configurations(
42+
project_context: ProjectContext, actuator_configuration_identifiers: list[str]
43+
) -> list[ActuatorConfiguration]:
44+
"""Retrieves actuator configurations from the metastore
45+
46+
Fetches ActuatorConfiguration resources from the metastore using the provided
47+
identifiers and validates that each actuator has at most one configuration.
48+
49+
Params:
50+
project_context: Project context for connecting to the metastore
51+
actuator_configuration_identifiers: List of identifiers for actuator
52+
configuration resources to retrieve
53+
54+
Returns:
55+
List of ActuatorConfiguration instances retrieved from the metastore
56+
57+
Raises:
58+
ValueError: If more than one ActuatorConfiguration references the same actuator
59+
ResourceDoesNotExistError: If any of the identifiers is not found in the project.
60+
"""
61+
import orchestrator.metastore.sqlstore
62+
63+
sql = orchestrator.metastore.sqlstore.SQLStore(project_context=project_context)
64+
65+
actuator_configurations = [
66+
sql.getResource(
67+
identifier=identifier,
68+
kind=CoreResourceKinds.ACTUATORCONFIGURATION,
69+
raise_error_if_no_resource=True,
70+
).config
71+
for identifier in actuator_configuration_identifiers
72+
]
73+
74+
actuator_identifiers = {conf.actuatorIdentifier for conf in actuator_configurations}
75+
if len(actuator_identifiers) != len(actuator_configuration_identifiers):
76+
raise ValueError("Only one ActuatorConfiguration is permitted per Actuator")
77+
78+
return actuator_configurations
79+
80+
81+
def validate_actuator_configurations_against_space_configuration(
82+
actuator_configurations: list[ActuatorConfiguration],
83+
discovery_space_configuration: DiscoverySpaceConfiguration,
84+
):
85+
"""Validates that actuator configurations are compatible with a discovery space
86+
87+
Checks that all actuators referenced in the actuator configurations are used
88+
in the experiments defined in the discovery space configuration.
89+
90+
Params:
91+
actuator_configurations: List of actuator configurations to validate
92+
discovery_space_configuration: The discovery space configuration to validate against
93+
94+
95+
Raises:
96+
ValueError: If any actuator identifier in actuator_configurations does not
97+
appear in the experiments of the discovery space
98+
"""
99+
actuator_identifiers = {conf.actuatorIdentifier for conf in actuator_configurations}
100+
101+
# Check the actuators configurations refer to actuators used in the MeasurementSpace
102+
# The experiment identifiers are in two different locations
103+
if isinstance(
104+
discovery_space_configuration.experiments, MeasurementSpaceConfiguration
105+
):
106+
experiment_actuator_identifiers = {
107+
experiment.actuatorIdentifier
108+
for experiment in discovery_space_configuration.experiments.experiments
109+
}
110+
else:
111+
experiment_actuator_identifiers = {
112+
experiment.actuatorIdentifier
113+
for experiment in discovery_space_configuration.experiments
114+
}
115+
116+
if not experiment_actuator_identifiers.issuperset(actuator_identifiers):
117+
raise ValueError(
118+
f"Actuator Identifiers {actuator_identifiers} must appear in the experiments of its space"
119+
)
120+
121+
122+
def validate_actuator_configuration_ids_against_space_ids(
123+
actuator_configuration_identifiers: list[str],
124+
space_identifiers: list[str],
125+
project_context: ProjectContext,
126+
) -> list[ActuatorConfiguration]:
127+
"""Validates actuator configuration identifiers against space identifiers
128+
129+
Retrieves actuator configurations and space configurations from the metastore,
130+
then validates that all actuator configurations are compatible with all specified
131+
discovery spaces.
132+
133+
Params:
134+
actuator_configuration_identifiers: List of actuator configuration resource
135+
identifiers to validate
136+
space_identifiers: List of discovery space resource identifiers to validate against
137+
project_context: Project context for connecting to the metastore
138+
139+
Returns:
140+
List of ActuatorConfiguration instances that were validated
141+
142+
Raises:
143+
ValueError: If any actuator configuration is not compatible with any of the
144+
discovery spaces, or if more than one ActuatorConfiguration references
145+
the same actuator
146+
ResourceDoesNotExistError: If any of the identifiers is not found in the project.
147+
148+
"""
149+
import orchestrator.metastore.sqlstore
150+
151+
sql = orchestrator.metastore.sqlstore.SQLStore(project_context=project_context)
152+
space_configurations: list[DiscoverySpaceConfiguration] = [
153+
sql.getResource(
154+
identifier=identifier,
155+
kind=CoreResourceKinds.DISCOVERYSPACE,
156+
raise_error_if_no_resource=True,
157+
).config
158+
for identifier in space_identifiers
159+
]
160+
161+
actuator_configurations = get_actuator_configurations(
162+
project_context=project_context,
163+
actuator_configuration_identifiers=actuator_configuration_identifiers,
164+
)
165+
166+
for config in space_configurations:
167+
validate_actuator_configurations_against_space_configuration(
168+
actuator_configurations=actuator_configurations,
169+
discovery_space_configuration=config,
170+
)
171+
172+
return actuator_configurations
173+
174+
41175
class OperatorModuleConf(ModuleConf):
42176
moduleType: ModuleTypeEnum = pydantic.Field(default=ModuleTypeEnum.OPERATION)
43177

@@ -128,10 +262,8 @@ class DiscoveryOperationConfiguration(pydantic.BaseModel):
128262
)
129263

130264

131-
class BaseOperationRunConfiguration(pydantic.BaseModel):
132-
"""Field shared by OrchestratorRunConfiguration and OperationResourceConfiguration
133-
134-
both are models used to run an operation"""
265+
class DiscoveryOperationResourceConfiguration(pydantic.BaseModel):
266+
"""Pydantic model used to define an operation"""
135267

136268
operation: DiscoveryOperationConfiguration
137269
metadata: ConfigurationMetadata = pydantic.Field(
@@ -140,6 +272,10 @@ class BaseOperationRunConfiguration(pydantic.BaseModel):
140272
"Two optional keys that are used by convention are name and description",
141273
)
142274
actuatorConfigurationIdentifiers: list[str] = pydantic.Field(default=[])
275+
spaces: list[str] = pydantic.Field(
276+
description="List of ids of the spaces the operation will be applied to",
277+
min_length=1,
278+
)
143279
model_config = ConfigDict(
144280
extra="forbid",
145281
json_schema_extra={
@@ -160,118 +296,54 @@ def get_actuatorconfigurations(
160296
there are no actuatorConfigurationIdentifiers.
161297
162298
163-
Raises: ValueError if there is more than one ActuatorConfigurationResource references the same actuator
299+
Raises:
300+
ValueError if there is more than one ActuatorConfigurationResource references the same actuator
301+
ResourceDoesNotExistError if any actuator configuration identifier cannot be found in the project
164302
"""
165303

166-
import orchestrator.metastore.sqlstore
167-
168304
if not self.actuatorConfigurationIdentifiers:
169305
return []
170306

171-
sql = orchestrator.metastore.sqlstore.SQLStore(project_context=project_context)
172-
173-
actuator_configurations = [
174-
sql.getResource(
175-
identifier=identifier,
176-
kind=CoreResourceKinds.ACTUATORCONFIGURATION,
177-
raise_error_if_no_resource=True,
178-
).config
179-
for identifier in self.actuatorConfigurationIdentifiers
180-
]
181-
182-
actuator_identifiers = {
183-
conf.actuatorIdentifier for conf in actuator_configurations
184-
}
185-
if len(actuator_identifiers) != len(self.actuatorConfigurationIdentifiers):
186-
raise ValueError("Only one ActuatorConfiguration is permitted per Actuator")
187-
188-
return actuator_configurations
189-
190-
def validate_actuatorconfigurations_against_space(
191-
self,
192-
project_context: ProjectContext,
193-
discoverySpaceConfiguration: DiscoverySpaceConfiguration,
194-
) -> list[ActuatorConfiguration]:
195-
196-
actuator_configurations = self.get_actuatorconfigurations(
197-
project_context=project_context
307+
return get_actuator_configurations(
308+
project_context=project_context,
309+
actuator_configuration_identifiers=self.actuatorConfigurationIdentifiers,
198310
)
199-
actuator_identifiers = {
200-
conf.actuatorIdentifier for conf in actuator_configurations
201-
}
202-
203-
# Check the actuators configurations refer to actuators used in the MeasurementSpace
204-
# The experiment identifiers are in two different locations
205-
if isinstance(
206-
discoverySpaceConfiguration.experiments, MeasurementSpaceConfiguration
207-
):
208-
experiment_actuator_identifiers = {
209-
experiment.actuatorIdentifier
210-
for experiment in discoverySpaceConfiguration.experiments.experiments
211-
}
212-
else:
213-
experiment_actuator_identifiers = {
214-
experiment.actuatorIdentifier
215-
for experiment in discoverySpaceConfiguration.experiments
216-
}
217-
218-
if not experiment_actuator_identifiers.issuperset(actuator_identifiers):
219-
raise ValueError(
220-
f"Actuator Identifiers {actuator_identifiers} must appear in the experiments of its space"
221-
)
222-
223-
return actuator_configurations
224-
225-
226-
class DiscoveryOperationResourceConfiguration(BaseOperationRunConfiguration):
227-
228-
spaces: list[str] = pydantic.Field(
229-
description="The spaces the operation will be applied to"
230-
)
231-
232-
@pydantic.field_validator("spaces")
233-
def check_space_set(cls, value):
234-
"""Checks at least one space identifier has been given"""
235-
236-
if len(value) == 0:
237-
raise ValueError(
238-
"You must provide at least one space identifier to an operation"
239-
)
240-
241-
return value
242311

243312
def validate_actuatorconfigurations(
244313
self, project_context: ProjectContext
245314
) -> list[ActuatorConfiguration]:
315+
"""Gets and valdidates the actuator configuration resources referenced by actuatorConfigurationIdentifiers from the metastore if any
246316
247-
from orchestrator.core.discoveryspace.space import DiscoverySpace
317+
This also requires getting the configuration of the discovery space
248318
249-
actuator_configurations: list[ActuatorConfiguration] = []
250-
for space in self.spaces:
251-
discovery_space = DiscoverySpace.from_stored_configuration(
252-
project_context=project_context,
253-
space_identifier=space,
254-
)
319+
Params:
320+
project_context: Information for connection to the metastore
255321
256-
actuator_configurations.extend(
257-
super().validate_actuatorconfigurations_against_space(
258-
project_context=project_context,
259-
discoverySpaceConfiguration=discovery_space.config,
260-
)
261-
)
322+
Returns:
323+
A list of ActuatorConfigurationResource instance. The list will be empty if
324+
there are no actuatorConfigurationIdentifiers.
262325
263-
return actuator_configurations
264326
327+
Raises: ValueError if more than one ActuatorConfigurationResource references the same actuator
328+
"""
265329

266-
class FunctionOperationInfo(pydantic.BaseModel):
267-
"""Class for holding information for operations executed via operator functions
330+
return validate_actuator_configuration_ids_against_space_ids(
331+
actuator_configuration_identifiers=self.actuatorConfigurationIdentifiers,
332+
space_identifiers=self.spaces,
333+
project_context=project_context,
334+
)
268335

269-
Operators implemented as functions may need additional information.
270-
Rather that have these as multiple params we gather them in this model"""
336+
337+
class FunctionOperationInfo(pydantic.BaseModel):
338+
"""Class for providing information to operator functions"""
271339

272340
metadata: ConfigurationMetadata = pydantic.Field(
273341
default=ConfigurationMetadata(),
274342
description="User defined metadata about the configuration. A set of keys and values. "
275343
"Two optional keys that are used by convention are name and description",
276344
)
277345
actuatorConfigurationIdentifiers: list[str] = pydantic.Field(default=[])
346+
ray_namespace: str | None = pydantic.Field(
347+
description="The namespace the operation should create ray workers/actors in",
348+
default=None,
349+
)

0 commit comments

Comments
 (0)