Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ spec:
parallelism:
type: integer
minimum: 1
initialParallelism:
type: integer
minimum: 1
deleteMode:
type: string
enum: [Savepoint, None, ForceCancel]
Expand Down
6 changes: 5 additions & 1 deletion docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ Below is the list of fields in the custom resource and their description:
the operator uses the Web API to submit jobs.

* **parallelism** `type:int32`
Job level parallelism for the Flink Job.
Job level parallelism for the Flink Job.

* **initialParallelism** `type:int32`
Initial Job level parallelism for the Flink Job. It is used to deploy the new flink cluster when parallelism is absent.
If neither parallelism nor initialParallelism is specified a flink application will be created with parallelism level 1.

* **entryClass** `type:string`
Entry point for the Flink job.
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type FlinkApplicationSpec struct {
JobManagerConfig JobManagerConfig `json:"jobManagerConfig,omitempty"`
JarName string `json:"jarName"`
Parallelism int32 `json:"parallelism"`
InitialParallelism int32 `json:"initialParallelism,omitempty"`
EntryClass string `json:"entryClass,omitempty"`
ProgramArgs string `json:"programArgs,omitempty"`
// Deprecated: use SavepointPath instead
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/flinkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ func (r *ReconcileFlinkApplication) Reconcile(request reconcile.Request) (reconc
}

if instance.Spec.Parallelism == 0 {
instance.Spec.Parallelism = 1
if instance.Spec.InitialParallelism != 0 {
instance.Spec.Parallelism = instance.Spec.InitialParallelism
} else {
instance.Spec.Parallelism = 1
}
}

// We are seeing instances where getResource is removing TypeMeta
Expand Down