diff --git a/CHANGES.md b/CHANGES.md index f9b9f1d28483..06131502ee7a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,8 +70,7 @@ ## New Features / Improvements -* Added support for setting disk provisioned IOPS and throughput in Dataflow runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps` pipeline options (Java/Go) ([#38349](https://github.com/apache/beam/issues/38349)). -* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Added support for setting disk provisioned IOPS and throughput in Dataflow runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps` pipeline options (Java/Go/Python) ([#38349](https://github.com/apache/beam/issues/38349)). * TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to encode finished bitset. SentinelBitSetCoder and BitSetCoder are state compatible. Both coders can decode encoded bytes from the other coder @@ -2435,4 +2434,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss ## Highlights -- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). +- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). \ No newline at end of file diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index d60d75283eab..91dd77d6cf02 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1405,6 +1405,24 @@ def _add_argparse_args(cls, parser): dest='disk_type', default=None, help=('Specifies what type of persistent disk should be used.')) + parser.add_argument( + '--disk_provisioned_iops', + type=int, + default=None, + dest='disk_provisioned_iops', + help=( + 'The provisioned IOPS of the disk. If not set, the Dataflow service' + ' will choose a reasonable default.'), + ) + parser.add_argument( + '--disk_provisioned_throughput_mibps', + type=int, + default=None, + dest='disk_provisioned_throughput_mibps', + help=( + 'The provisioned throughput of the disk in MiB/s. If not set, the' + ' Dataflow service will choose a reasonable default.'), + ) parser.add_argument( '--worker_region', default=None, diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 215c44156ea6..901f56b99cb6 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -444,12 +444,18 @@ def test_worker_options(self): 'abc', '--disk_type', 'def', + '--disk_provisioned_iops', + '4000', + '--disk_provisioned_throughput_mibps', + '200', '--element_processing_timeout_minutes', '10', ]) worker_options = options.view_as(WorkerOptions) self.assertEqual(worker_options.machine_type, 'abc') self.assertEqual(worker_options.disk_type, 'def') + self.assertEqual(worker_options.disk_provisioned_iops, 4000) + self.assertEqual(worker_options.disk_provisioned_throughput_mibps, 200) self.assertEqual(worker_options.element_processing_timeout_minutes, 10) options = PipelineOptions( diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 29cb36071488..871f227c7c73 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -207,6 +207,11 @@ def __init__( pool.diskSizeGb = self.worker_options.disk_size_gb if self.worker_options.disk_type: pool.diskType = self.worker_options.disk_type + if self.worker_options.disk_provisioned_iops is not None: + pool.diskProvisionedIops = self.worker_options.disk_provisioned_iops + if self.worker_options.disk_provisioned_throughput_mibps is not None: + pool.diskProvisionedThroughputMibps = ( + self.worker_options.disk_provisioned_throughput_mibps) if self.worker_options.zone: pool.zone = self.worker_options.zone if self.worker_options.network: diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 66b1c8e1e5bb..3870bb4a2a4a 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -609,6 +609,23 @@ def test_number_of_worker_harness_threads(self): FAKE_PIPELINE_URL) self.assertEqual(env.proto.workerPools[0].numThreadsPerWorker, 2) + def test_disk_provisioning_options(self): + pipeline_options = PipelineOptions([ + '--temp_location', + 'gs://any-location/temp', + '--disk_provisioned_iops', + '4000', + '--disk_provisioned_throughput_mibps', + '200' + ]) + env = apiclient.Environment([], + pipeline_options, + '2.0.0', + FAKE_PIPELINE_URL) + self.assertEqual(env.proto.workerPools[0].diskProvisionedIops, 4000) + self.assertEqual( + env.proto.workerPools[0].diskProvisionedThroughputMibps, 200) + @mock.patch( 'apache_beam.runners.dataflow.internal.apiclient.' 'beam_version.__version__', diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py index 0c096e73c1ac..582fb30b57b1 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py @@ -2634,8 +2634,8 @@ class FlexTemplateRuntimeEnvironment(_messages.Message): ipConfiguration: Configuration for VM IPs. kmsKeyName: Name for the Cloud KMS key for the job. Key format is: projects//locations//keyRings//cryptoKeys/ - launcherMachineType: The machine type to use for launching the job. The - default is n1-standard-1. + launcherMachineType: The machine type to use for launching the job. If not + set, Dataflow will select a default machine type. machineType: The machine type to use for the job. Defaults to the value from the template if not specified. maxWorkers: The maximum number of Google Compute Engine instances to be @@ -3209,6 +3209,7 @@ class Job(_messages.Message): attempts to create a job with the same name as an active job that already exists, the attempt returns the existing job. The name must match the regular expression `[a-z]([-a-z0-9]{0,1022}[a-z0-9])?` + pausable: Output only. Indicates whether the job can be paused. pipelineDescription: Preliminary field: The format of this data may change at any time. A description of the user pipeline and stages through which it is executed. Created by Cloud Dataflow service. Only retrieved with @@ -3498,22 +3499,23 @@ class AdditionalProperty(_messages.Message): labels = _messages.MessageField('LabelsValue', 10) location = _messages.StringField(11) name = _messages.StringField(12) - pipelineDescription = _messages.MessageField('PipelineDescription', 13) - projectId = _messages.StringField(14) - replaceJobId = _messages.StringField(15) - replacedByJobId = _messages.StringField(16) - requestedState = _messages.EnumField('RequestedStateValueValuesEnum', 17) - runtimeUpdatableParams = _messages.MessageField('RuntimeUpdatableParams', 18) - satisfiesPzi = _messages.BooleanField(19) - satisfiesPzs = _messages.BooleanField(20) - serviceResources = _messages.MessageField('ServiceResources', 21) - stageStates = _messages.MessageField('ExecutionStageState', 22, repeated=True) - startTime = _messages.StringField(23) - steps = _messages.MessageField('Step', 24, repeated=True) - stepsLocation = _messages.StringField(25) - tempFiles = _messages.StringField(26, repeated=True) - transformNameMapping = _messages.MessageField('TransformNameMappingValue', 27) - type = _messages.EnumField('TypeValueValuesEnum', 28) + pausable = _messages.BooleanField(13) + pipelineDescription = _messages.MessageField('PipelineDescription', 14) + projectId = _messages.StringField(15) + replaceJobId = _messages.StringField(16) + replacedByJobId = _messages.StringField(17) + requestedState = _messages.EnumField('RequestedStateValueValuesEnum', 18) + runtimeUpdatableParams = _messages.MessageField('RuntimeUpdatableParams', 19) + satisfiesPzi = _messages.BooleanField(20) + satisfiesPzs = _messages.BooleanField(21) + serviceResources = _messages.MessageField('ServiceResources', 22) + stageStates = _messages.MessageField('ExecutionStageState', 23, repeated=True) + startTime = _messages.StringField(24) + steps = _messages.MessageField('Step', 25, repeated=True) + stepsLocation = _messages.StringField(26) + tempFiles = _messages.StringField(27, repeated=True) + transformNameMapping = _messages.MessageField('TransformNameMappingValue', 28) + type = _messages.EnumField('TypeValueValuesEnum', 29) class JobExecutionDetails(_messages.Message): @@ -5342,8 +5344,14 @@ class RuntimeUpdatableParams(_messages.Message): during job creation. Fields: - acceptableBacklogDuration: Optional. The backlog threshold duration in - seconds for autoscaling. Value must be non-negative. + acceptableBacklogDuration: Optional. Deprecated: Use `latency_tier` + instead. The backlog threshold duration in seconds for autoscaling. + Value must be non-negative. + autoscalingTier: Optional. Deprecated: Use `latency_tier` instead. The + backlog threshold tier for autoscaling. Value must be one of "low- + latency", "medium-latency", or "high-latency". + latencyTier: Optional. The backlog threshold tier for autoscaling. Value + must be one of "low-latency", "medium-latency", or "high-latency". maxNumWorkers: The maximum number of workers to cap autoscaling at. This field is currently only supported for Streaming Engine jobs. minNumWorkers: The minimum number of workers to scale down to. This field @@ -5357,9 +5365,11 @@ class RuntimeUpdatableParams(_messages.Message): """ acceptableBacklogDuration = _messages.StringField(1) - maxNumWorkers = _messages.IntegerField(2, variant=_messages.Variant.INT32) - minNumWorkers = _messages.IntegerField(3, variant=_messages.Variant.INT32) - workerUtilizationHint = _messages.FloatField(4) + autoscalingTier = _messages.StringField(2) + latencyTier = _messages.StringField(3) + maxNumWorkers = _messages.IntegerField(4, variant=_messages.Variant.INT32) + minNumWorkers = _messages.IntegerField(5, variant=_messages.Variant.INT32) + workerUtilizationHint = _messages.FloatField(6) class SDKInfo(_messages.Message): @@ -7775,6 +7785,9 @@ class WorkerPool(_messages.Message): defaultPackageSet: The default package set to install. This allows the service to select a default set of packages which are useful to worker harnesses written in a particular language. + diskProvisionedIops: Optional. IOPS provisioned for the root disk for VMs. + diskProvisionedThroughputMibps: Optional. Throughput provisioned for the + root disk for VMs. diskSizeGb: Size of root disk for VMs, in GB. If zero or unspecified, the service will attempt to choose a reasonable default. diskSourceImage: Fully qualified source image for disks. @@ -7938,25 +7951,27 @@ class AdditionalProperty(_messages.Message): autoscalingSettings = _messages.MessageField('AutoscalingSettings', 1) dataDisks = _messages.MessageField('Disk', 2, repeated=True) defaultPackageSet = _messages.EnumField('DefaultPackageSetValueValuesEnum', 3) - diskSizeGb = _messages.IntegerField(4, variant=_messages.Variant.INT32) - diskSourceImage = _messages.StringField(5) - diskType = _messages.StringField(6) - ipConfiguration = _messages.EnumField('IpConfigurationValueValuesEnum', 7) - kind = _messages.StringField(8) - machineType = _messages.StringField(9) - metadata = _messages.MessageField('MetadataValue', 10) - network = _messages.StringField(11) - numThreadsPerWorker = _messages.IntegerField(12, variant=_messages.Variant.INT32) - numWorkers = _messages.IntegerField(13, variant=_messages.Variant.INT32) - onHostMaintenance = _messages.StringField(14) - packages = _messages.MessageField('Package', 15, repeated=True) - poolArgs = _messages.MessageField('PoolArgsValue', 16) - sdkHarnessContainerImages = _messages.MessageField('SdkHarnessContainerImage', 17, repeated=True) - subnetwork = _messages.StringField(18) - taskrunnerSettings = _messages.MessageField('TaskRunnerSettings', 19) - teardownPolicy = _messages.EnumField('TeardownPolicyValueValuesEnum', 20) - workerHarnessContainerImage = _messages.StringField(21) - zone = _messages.StringField(22) + diskProvisionedIops = _messages.IntegerField(4) + diskProvisionedThroughputMibps = _messages.IntegerField(5) + diskSizeGb = _messages.IntegerField(6, variant=_messages.Variant.INT32) + diskSourceImage = _messages.StringField(7) + diskType = _messages.StringField(8) + ipConfiguration = _messages.EnumField('IpConfigurationValueValuesEnum', 9) + kind = _messages.StringField(10) + machineType = _messages.StringField(11) + metadata = _messages.MessageField('MetadataValue', 12) + network = _messages.StringField(13) + numThreadsPerWorker = _messages.IntegerField(14, variant=_messages.Variant.INT32) + numWorkers = _messages.IntegerField(15, variant=_messages.Variant.INT32) + onHostMaintenance = _messages.StringField(16) + packages = _messages.MessageField('Package', 17, repeated=True) + poolArgs = _messages.MessageField('PoolArgsValue', 18) + sdkHarnessContainerImages = _messages.MessageField('SdkHarnessContainerImage', 19, repeated=True) + subnetwork = _messages.StringField(20) + taskrunnerSettings = _messages.MessageField('TaskRunnerSettings', 21) + teardownPolicy = _messages.EnumField('TeardownPolicyValueValuesEnum', 22) + workerHarnessContainerImage = _messages.StringField(23) + zone = _messages.StringField(24) class WorkerSettings(_messages.Message):