This post is the summation of months of debugging and optimizing ML and AI workflows on Google Cloud DataProc clusters using Cloud Composer / Apache Airflow for orchestration.
If you would prefer to jump directly to a chapter:
- What is DataProc? And why do we need it?
- Environments, what’s shared and what’s local
- Environment variables
- Cluster configuration
- Cluster startup optimization
- Runtime variables
Otherwise, hold on tight, because here we go…
What is DataProc? And why do we need it?
Understanding Airflow and Spark in Depth
Google Cloud Composer is a managed version of Apache Airflow, an orchestration tool for defining, scheduling, and monitoring workflows. Workflows in Airflow are structured as Directed Acyclic Graphs (DAGs), which define a series of tasks and their dependencies.
Structure of an Airflow DAG
A typical Airflow DAG for DataProc involves three key steps:
- Provisioning a DataProc cluster to allocate compute resources.
- Executing the workload, typically a Spark job.
- Decommissioning the cluster to optimize resource usage and cost.
This approach ensures efficient resource utilization, as clusters exist only for the duration of processing. Managing cost and resource efficiency is critical in large-scale data workflows.
Here’s an overview of how such a workflow is structured:

Airflow provides a mechanism to define tasks and their relationships programmatically, ensuring that dependencies are correctly handled. The DataProc cluster itself is a managed Apache Spark environment on Google Cloud, similar to Amazon EMR, designed for scalable, distributed data processing.
What is Apache Spark?
Apache Spark is an open-source, distributed computing system optimized for large-scale data processing. It provides APIs in Scala, Java, Python, and R and operates efficiently using an in-memory computation model.
Key Advantages of Spark
- In-Memory Processing: Unlike traditional disk-based processing systems like Hadoop MapReduce, Spark keeps intermediate data in memory, dramatically improving performance.
- Lazy Evaluation: Spark constructs an execution plan before running computations, minimizing redundant operations.
- Fault Tolerance: Spark’s Resilient Distributed Dataset (RDD) abstraction ensures data recovery in case of node failures.
- DataFrame and SQL API: Higher-level abstractions simplify working with structured data, integrating SQL queries directly into Spark workflows.
- Scalability: Spark can scale across thousands of nodes, making it suitable for enterprise-grade workloads.
Integrating Airflow and DataProc for scalable workflows
Consider a scenario where a machine learning model requires training on a large dataset. The workflow follows these steps:
- Cluster Provisioning: A DataProc cluster is created with an appropriate configuration.
- Job Execution: A Spark job processes the dataset.
- Storage and Cleanup: Processed results are stored in Google Cloud Storage (GCS), and the cluster is shut down.
- Monitoring: Airflow tracks job status and failure handling mechanisms.
- Automated Scheduling: Recurring workflows can be scheduled for periodic execution.
Example Airflow DAG for DataProc
Below is an example Airflow DAG that automates this workflow:
|
|
Optimization strategies for cost and performance
Efficiently managing DataProc clusters and Airflow workflows can significantly impact cost and performance. Consider the following:
- Auto-scaling: Dynamically allocate resources based on workload demand.
- Pre-emptible Instances: Use cost-effective, short-lived VMs to lower computational costs.
- Persistent vs. Ephemeral Clusters: Persistent clusters reduce spin-up time but may be more expensive if underutilized.
- Storage Optimization: Compress and clean up unused data to minimize storage costs.
- Spot Instances: Some cloud providers offer excess compute capacity at reduced prices.
- Resource Monitoring: Track CPU and memory usage to identify bottlenecks and improve efficiency.
Conclusion
By leveraging Google Cloud Composer (Airflow) and DataProc (Spark), organizations can automate complex data workflows efficiently. The combination of Airflow for orchestration and Spark for distributed computing provides a scalable, cost-effective solution for big data processing.
With optimized configurations and efficient resource allocation, these tools enable businesses to process massive datasets while minimizing costs. Whether for log processing, machine learning training, or ETL pipelines, this setup offers high performance, automation, and scalability.
Understanding environments: shared vs local
Conceptual overview of Airflow and Spark environments
Google Cloud Composer, a managed Apache Airflow, orchestrates workflows using Directed Acyclic Graphs (DAGs). DAGs define a sequence of tasks and their dependencies, ensuring reliable workflow execution.
A typical Composer DAG follows three primary steps:
- Provisioning a DataProc cluster
- Executing the workload
- Tearing down the cluster
This structure ensures efficient resource utilization and cost optimization.

How Airflow and DataProc work together
A DataProc cluster is a Google Cloud-managed service that transforms Compute Instance resources into an Apache Spark cluster for processing large-scale data. When Airflow provisions a cluster, it effectively creates a secondary execution environment within a dedicated Virtual Private Cloud (VPC).

Configuring the DataProc environment
DataProc clusters can be provisioned using:
- Google Cloud Console
- Command-line tools
- Airflow’s DataProcClusterOperator
At cluster creation, several background operations are automatically executed, including:
- Provisioning head node(s)
- Installing the Hadoop Distributed File System (HDFS)
- Configuring YARN for resource scheduling
- Establishing internal networking

Provisioning worker nodes
Once the Spark cluster is initialized, worker nodes are dynamically provisioned based on the specified configuration. Each worker node:
- Pulls the base DataProc image
- Starts the Spark service
- Runs initialization scripts
This process repeats for each requested worker node.

Worker nodes undergo health checks before registering with head nodes to receive jobs.
Executing PySpark Jobs
Within the cluster, Python code runs as PySpark jobs on worker nodes. Each worker node requires a Python environment, ensuring compatibility with submitted scripts. Once all jobs are processed, Airflow enters the Teardown phase, deallocating resources.

Configuration scope and security considerations
When tuning an Airflow-Spark system, it’s essential to recognize that settings and parameters exist at multiple levels. Each layer—including Airflow, DataProc, Spark, and individual worker nodes—can influence execution performance. Additionally, security policies can be enforced at various levels, affecting deployment strategies and access control.

Conclusion
Understanding shared and local environments within Airflow and DataProc is essential for designing scalable, cost-effective workflows. By correctly configuring clusters, defining resource scopes, and implementing security policies, teams can optimize performance while maintaining a robust orchestration framework.
Passing environment variables in DataProc and Composer
Executive summary
A common design pattern for orchestrating AI and ML pipelines leverages Google Cloud DataProc to execute Spark-based workflows, offering scalable compute resources for distributed data processing.
According to Google’s marketing materials, DataProc offers:
Serverless deployment, logging, and monitoring let you focus on your data and analytics, not on your infrastructure.
As data practitioners, we may not always have access to all cloud resources, including direct access to DataProc logs, cluster monitoring tools, or SSH access to cluster instances. Security policies or organizational controls often restrict these capabilities, requiring us to rely on orchestration tools like Airflow and alternative logging or monitoring approaches.
A common design pattern for Google Cloud clients is to use Google Cloud Composer, a managed version of Apache Airflow, to orchestrate and schedule Spark jobs. Each Composer environment is linked to a dedicated Google Cloud Storage (GCS) bucket, which is also mounted as a local filesystem on the DataProc cluster when it starts.
Standard workflow
The typical workflow follows these steps:
- Initialize a Directed Acyclic Graph (DAG) in Airflow.
- The initial node provisions a new Spark cluster.
- Subsequent nodes submit PySpark jobs to the cluster.
- The final node tears down the cluster.
The Airflow DataProc operators make it possible to orchestrate this entire workflow using Python code that runs within the Composer environment.
Here’s an example DAG that reflects this pattern:
|
|
Airflow DAG IDs can only contain uppercase letters, numbers, and periods. Avoid using underscores (_) or dashes (-) in cluster names, as they can cause silent failures in Google Cloud operations.
The DAG.py
file is uploaded to the Composer dags directory in the associated GCS bucket (gs://<BUCKET>/dags
). Airflow polls this directory every minute, and new files automatically appear in the UI. When updating a DAG, it’s good practice to confirm that the new code is visible under the Code tab before triggering a run.
Managing composer variables
To enhance security and maintain consistency across environments, many variables are injected into the DAG runtime environment directly from the Composer instance. This allows the DAG to dynamically retrieve these values using either Python’s built-in os.getenv()
or Airflow’s airflow.models.Variable
class.
To support reproducibility and deployment automation, projects may include a variables.json
file in their source repository. This file can be deployed to Composer through several methods.
Deploying variables
Method 1: Deploy to the GCS bucket
Airflow supports automatically importing variables from a variables.json
file placed in gs://<BUCKET>/data/
. Adding a command to the deployment pipeline, such as .gitlab-ci.yml
or .github/workflows/release.yml
, ensures the file is copied during deployment:
|
|
At present, automatic parsing of this file is restricted, though this may change in higher environments with appropriate approval.
Method 2: Import via command line
The variables can also be directly imported using the gcloud
CLI:
|
|
This approach would require CI/CD runner permissions to authenticate, upload, and apply the variables. Currently, these permissions are not granted to the service account.
Method 3: Manual import via web interface
Variables can also be managed directly through the Airflow UI.
- Navigate to Admin > Variables.
- Manually enter key-value pairs or upload the
variables.json
file directly.

The uploaded file’s variables will appear in the table.

Preferred approach
Due to the infrequent nature of variable changes and the current lack of automation permissions, manually uploading variables through the Airflow UI is the preferred method. This also improves operational security, as no sensitive bucket names or regions need to be stored in the source repository.
This approach balances flexibility, security, and operational control while ensuring variables are correctly injected into the Composer environment at runtime.
DataProc cluster configuration
Introduction
In many DataProc projects, jobs are executed under nearly identical conditions — they typically have low memory and CPU requirements and access the same data sources.
To avoid duplicating boilerplate configuration across multiple DAGs, a shared configuration file can be created under the /dag
directory. Each environment (e.g., development, UAT, production) can have its own <env>.py
file that defines a standard cluster configuration. This shared configuration can then be imported into all relevant DAGs, with project-specific overrides applied when necessary.
Example shared configuration file
|
|
Number of workers
The minimum number of workers (num_workers
) must be greater than 1. DataProc requires at least two workers to form a fully functional cluster.
Clusters configured with only a single worker will pass initial DAG validation and begin provisioning, but will ultimately time out with an unclear error message.
Cluster timeout (auto_delete_ttl
)
The auto_delete_ttl
parameter is often misunderstood. It defines the total lifetime of the cluster, not an idle timeout.
If this value is set below 10 minutes (600 seconds), the DAG will validate successfully and cluster provisioning will start, but the cluster will silently terminate without producing any meaningful error messages.
Typical startup durations
For small Spark clusters with two standard workers in North American regions, observed provisioning times are:
- Minimum startup time: Approximately 7 minutes
- Cold start (e.g., Monday morning): Up to 15 minutes
Clusters that install common data science libraries such as NumPy, scikit-learn, and pandas will often exceed 12.5 minutes for startup, even under ideal conditions.
Poorly optimized initialization scripts — especially those with complex or unconstrained pip install
commands — can easily push startup times beyond 10 minutes.
Recommended configuration
Despite natural cost-saving instincts, I strongly recommend setting the auto_delete_ttl
parameter significantly higher than you might expect — 20 to 30 minutes is a reasonable default.
For DAGs involving model training jobs, you may need to extend the timeout to several hours.
Because all DAGs include a cluster teardown task using trigger_rule="all_done"
, a generously high timeout does not pose a risk of excessive costs. The cluster will always be cleaned up automatically, even if the job fails or gets stuck.
DAG structure
With the previous configuration steps complete, the DAG itself follows a standard structure:
- Pull key variables from the Composer environment, including the environment name, region, and bucket.
- Retrieve the default cluster configuration from a shared submodule.
- Load default DAG arguments from the shared submodule.
- Assemble the DAG: cluster spin-up → PySpark job submission → cluster teardown.
Example DAG structure
|
|
Important considerations
As with DAG IDs, cluster names should only contain uppercase letters, numbers, and dashes. Avoid underscores and periods, as they will silently break cluster provisioning.
DAG files should avoid direct imports of business logic. Since DAGs are parsed by the Composer environment (a minimal Python environment), adding dependencies requires formal approval and complicates deployment.
Business logic should be packaged as a .whl
, .zip
, or .tar.gz
file and referenced using the python_file_uris
argument in the job configuration.
Job submission and argument handling
Since Airflow and Composer operate independently from the DataProc/Spark cluster, relative imports into DAG files are not possible. Instead, all arguments must be explicitly passed in the DataprocSubmitJobOperator.job
configuration as a valid DataProc job dictionary.
|
|
Reworking existing code
Business logic must be restructured to accept command-line arguments using argparse
, rather than relying on imported configurations.
|
|
DataProc cluster startup optimization
Introduction
Cluster startup time is a crucial factor in DataProc job efficiency. When a DataProc cluster is provisioned, worker nodes are allocated from a Compute Instance pool. The provisioning time depends on machine type availability in the compute region. Larger memory machines typically take longer to provision due to lower availability in the data center.
Once provisioned, workers automatically apply the DataProc image, which includes Java, Scala (for Spark), and Python 3.11 with Conda. However, no additional libraries are installed, meaning all project dependencies must be configured before executing any job.
To streamline the startup process, the initialization script follows three key steps:
- OS package update (
apt-get
) - Install OS binaries (e.g., required system dependencies)
- Install Python requirements
Optimizing dependency installation
There are multiple ways to install Python dependencies within a DataProc runtime. The choice of method significantly impacts cluster startup time.
The key options include:
- Installing from a requirements file
- Installing from a built Python package
- Using Poetry for dependency resolution
Option 1: Installing from a requirements file
|
|
Option 2: Installing from a .whl
Package
|
|
Final optimization strategy
Based on extensive testing, the most efficient approach was determined to be:
|
|
The COIN-OR installation could be removed as a version update to the data science libraries meant that they were builded inside of the Python libraries.
Key optimizations
- Dropped
apt-get
updates: Eliminating OS package updates significantly reduces startup time (~3 minutes saved) - Explicit Dependency Versioning: Pre-resolving dependency versions avoids unnecessary resolution during installation.
- Using
pip
Overpoetry
orconda
:pip
provides the fastest installation for pre-versioned dependencies.
Python package management for DataProc
When submitting PySpark jobs, dependencies must be packaged for the Spark environment. Unlike cluster-wide dependencies installed via startup scripts, these packages are defined at the job level.
Five methods for building a Python package
Option 1) Poetry build
With no changes to the project, call poetry
to build a wheel using the pyproject.toml
file.
|
|
A requirements file is still required for cluster setup. To export the dependencies:
|
|
Here we use --with
to select the dependency groups that are required.
The --without-hashes
is important to build a requirements file that is OS agnostic. Otherwise a requirements file built on a Mac will not be able to resolve any matching dependencies on a Linux based CI pipeline.
Option 2) Use Python’s built-in tools
Create a basic setup.py
:
|
|
With the setup file we can then build.
2a) Old method (setuptools)
The traditional way of building a Python package is with the built-in setuptools
library. This is largely discouraged, and will show warnings on the command line when executed.
From the command line call
|
|
To include the requirements file in the bundle (required for installation) requires adding the requirements file to a MANIFEST.in
file (also at project root).
|
|
2b) Modern python (build)
The preferred Pythonic way of building a package is using the built-in build package.
|
|
This builds both
<project>-0.1.0.tar.gz
<project>-0.1.0-py3-none-any.whl
Wheels do not include the dependencies, only references for installation.
Wheels are better for fast installation and when there are frozen dependencies (such as a release on a production pipeline)
Option 3) Zipping the source
DataProc supports Python code simply zipped into a .zip file. However, that .zip file still requires a top-level setup.py
or pyproject.toml
to instruct the package manager on how to handle the install.
In this project, the pyproject.toml
references the README.md
file, so it also needs to be included in the bundle.
3a) Zipping with the pyproject.toml
|
|
The README.md
needs to be included in the bundle as the pyproject.toml
makes reference to it.
3b) Zipping with the setup.py
|
|
Bundling the dependencies in the package
Unfortunately the nature of Python is that a project inevitably ends up with a lot of dependencies. By default pip
will install these sequentially, by querying an archive server, then downloading the package, caching it, then installing it - all over the open internet. If you are not careful with explicitly versioning each dependency pip will also try to rebuild the entire dependency tree upon install and will slow down the startup by as much as 15 minutes per run.
In a release situation, where the dependencies are frozen and clusters are frequently spun up, it can make sense to include the dependencies within the package. The (larger) package is then copied once from Cloud Storage to the Composer file system once and everything is installed without any calls to the internet.
The only complication is that the bundled dependencies need to exactly match the target run time. i.e. building a package on a Mac will pull Mac compiled dependencies and will not run on a Linux server.
Pull the Docker image
Jump into a (Debian) Linux based Docker image, ideally one that matches the Python version on the DataProc cluster
|
|
With the image downloaded, create and connect to an interactive session with the image. Note to connect with bash and not the Python command interface
|
|
Where,
$(pwd):/app
: Mounts your current directory (pwd) to /app in the container.-w /app
: Sets the working directory inside the container to /app.
Install dependencies to a target directory
From the container, install the requirements and save the cache
|
|
This will then pull the exact requirement binary versions for Linux and store them in the vendor
directory.
Again, we zip the project code, this time include the dependencies
|
|
For this project the ZIP size went from ~80KB to 468MB with the usual data science libraries.
However, this should save a lot of time on installation, as there is no resolution required from pip
, and there should be no network latency.
Benchmarking cluster initialization
To determine the optimal approach, a benchmarking exercise was conducted to compare different formats, dependency resolutions, and build systems.
Benchmark results
Format | Versioned Dependencies | Include Dependencies | Build System | Config | Build Size | DataProc Init Time |
---|---|---|---|---|---|---|
.tar.gz | No | No | Poetry | pyproject.toml | 80 KB | 25+ min |
.whl | No | No | Poetry | pyproject.toml | 80 KB | 7 min |
.whl | Yes | No | Poetry | pyproject.toml | 80 KB | 5 min |
.zip | No | No | Poetry .whl → zip | pyproject.toml | 80 KB | 25+ min |
.zip | No | Yes | Poetry .whl → zip | pyproject.toml | 468 MB | 22 min |
.zip | No | No | Poetry .whl → zip | setup.py | 80 KB | 12 min |
.whl | No | No | Python sdist | setup.py | 80 KB | 9 min |
.whl | No | No | Python build | setup.py | 80 KB | 9 min |
.zip | Yes | No | Zip | pyproject.toml | 80 KB | 25+ min |
.zip | Yes | Yes | Zip | pyproject.toml | 468 MB | 17 min |
.zip | Yes | No | Zip | setup.py | 80 KB | 12 min |
.zip | Yes | Yes | Zip | setup.py | 468 MB | 5 min |
Key takeaways
1) .whl
is the most performant file format
.whl
’s were the most performant.
.zip
’s were the second best. Curiously, taking a .whl
file and renaming it to a .zip almost doubled the cluster initialization.
.tar.gz
was by far the worst format and more than 5x the install time before the cluster timed out.
2) Versioned dependencies vastly increase install speed
Explicit versioning saved approx. 30% of cluster initialization time across file formats.
Of course, the version resolution needs to be added to the deployment pipeline - but this only runs once per release, but saves resolution every time a job is submitted.
3) Bundling the dependencies yields no real benefit
It was theorized that, as GCS and DataProc would be in the same data centre, moving a larger package internally would be faster than looking up and pulling ~100 libraries across the open internet. This was not the case. It might be that the DataProc service is providing a certain amount of caching of common libraries, or that pip can multithread the fetch and install of packages from the internet, but not locally.
4) pyproject.toml
performs slightly better than setup.py
Not a significant difference, but avoiding a legacy build system that is warned against and will be shortly deprecated makes sense - further it would pollute the project top level directory with more files.
5) Modern build systems produce more performant builds
At first glance, the packages built by sdist
, build and poetry appear identical - however, there are slight differences which affect their handling by the package manager at install. The new systems install slightly faster.
6) Larger jobs are more performant than increasing parallelization
Given that the package install has to occur every time a job is submitted to a Spark cluster worker, care should be taken to balance number of jobs with job size. From this investigation, increasing a job length by 5 minutes would be far more beneficial than creating a new job to do the same work.
For this reason, larger worker nodes utilizing a multithread approach would likely yield the best outcomes.
Passing runtime variables in DataProc and Airflow
Introduction
Using environment variables to pass secrets and configurations at runtime is a DevOps best practice that enhances security, flexibility, and maintainability. Hard-coding sensitive values in source code increases the risk of leaks, while environment variables keep secrets out of version control systems. This also enables dynamic configuration, allowing jobs to adapt across different environments (e.g., DEV, UAT, PRD) without modifying the application code.
In DataProc, variables can be set within Google Cloud Composer and injected into DataProc worker environments, allowing job scripts to retrieve these values dynamically.
Methods for passing runtime variables
Apache Spark provides multiple ways to pass variables to the runtime environment. The following are the four primary approaches, listed in order of preference.
OS-level initialization script (cluster startup)
Google Cloud’s recommended method is defining a startup script during cluster creation. This script is stored in Cloud Storage and executed as an initialization action.
Example command
|
|
Using this method, environment variables can be appended to the system’s /etc/environment
file which is run as the final initialization step for every machine that is added to the cluster.
Example startup script
|
|
Compute engine metadata (instance-level variables)
Since DataProc clusters are built on Google Compute Engine (GCE) instances, metadata can be passed to both head and worker nodes at time of provisioning from Google management layer.
Example command
|
|
Metadata can also be used for Hadoop and Spark properties:
|
|
Spark properties (job submission-level variables)
If the environment variables need to be job-specific, they can be injected directly into the Spark runtime environment when submitting a job.
Example command
|
|
From Airflow, the DataprocSubmitJobOperator
supports passing job-level properties:
|
|
Some security policies may block custom environment variables from being passed using this method.
Command-line arguments (explicit variable passing)
If other methods fail due to security restrictions, environment variables can be passed explicitly as command-line arguments.
|
|
This requires modifying the entry point of the run script to accept the new argument:
|
|
Use a secrets manager
The most secure way to manage runtime variables is to store sensitive values in a secrets manager rather than passing them directly.
Why Use a Secrets Manager?
- Security: Keeps secrets out of logs, DAGs, and environment variables.
- Access Control: Secrets can be role-restricted to prevent unauthorized access.
- Versioning: Allows tracking changes to secrets over time.
- Auditing: Provides logging to track access attempts.
- Ease of coding: The same variables can be used across deployment environments, so long as each environment has its own Secret Manager.
Google Cloud Secret Manager
Google Cloud Secret Manager provides centralized, access-controlled storage for secrets. All we need to do is add Secret Manager access to the deployment Service Account and then we can replace most variables with a simple secret lookup.
1) Store a Secret
|
|
2) Set a value
|
|
3) Retrieve a secret from Airflow
In Airflow DAGs, secrets can be accessed using Google Secret Manager Hook:
|
|
4) Retrieve a Secret from DataProc
Alternatively, secrets can be accessed directly from within the DataProc environment using the Google Cloud Secret Manager Python client.
Ensure that the Secret Manager library is included in the project requirements and installed during project initialization
Then in your PySpark job, retrieve the Secret
|
|
This method has the advantage of only pulling secrets when they are needed at runtime rather than at DAG execution time - this limits their scope to a single function or class rather than the entire execution environment.
Conclusion
When configuring runtime variables in DataProc and Airflow, the best method depends on security policies and operational requirements:
Method | Use Case |
---|---|
OS-Level Startup Script | Best for cluster-wide persistent variables |
Compute Engine Metadata | Works at the instance level; useful for per-node settings |
Spark Properties | Best for job-specific runtime variables |
Command-Line Arguments | A fallback option when security policies restrict variable injection |
Secrets Manager | The most secure option for sensitive values |
For sensitive data such as API keys, database credentials, and encryption secrets, Google Cloud Secret Manager should always be used.
This structured approach ensures secure and flexible runtime configuration across different environments.