DataProc - a (near) complete guide

DataProc - a (near) complete guide

This post is the summation of months of debugging and optimizing ML and AI workflows on Google Cloud DataProc clusters using Cloud Composer / Apache Airflow for orchestration.

If you would prefer to jump directly to a chapter:

  1. What is DataProc? And why do we need it?
  2. Environments, what’s shared and what’s local
  3. Environment variables
  4. Cluster configuration
  5. Cluster startup optimization
  6. Runtime variables

Otherwise, hold on tight, because here we go…


What is DataProc? And why do we need it?

Understanding Airflow and Spark in Depth

Google Cloud Composer is a managed version of Apache Airflow, an orchestration tool for defining, scheduling, and monitoring workflows. Workflows in Airflow are structured as Directed Acyclic Graphs (DAGs), which define a series of tasks and their dependencies.

Structure of an Airflow DAG

A typical Airflow DAG for DataProc involves three key steps:

  1. Provisioning a DataProc cluster to allocate compute resources.
  2. Executing the workload, typically a Spark job.
  3. Decommissioning the cluster to optimize resource usage and cost.

This approach ensures efficient resource utilization, as clusters exist only for the duration of processing. Managing cost and resource efficiency is critical in large-scale data workflows.

Here’s an overview of how such a workflow is structured:

Typical Airflow training job

Airflow provides a mechanism to define tasks and their relationships programmatically, ensuring that dependencies are correctly handled. The DataProc cluster itself is a managed Apache Spark environment on Google Cloud, similar to Amazon EMR, designed for scalable, distributed data processing.

What is Apache Spark?

Apache Spark is an open-source, distributed computing system optimized for large-scale data processing. It provides APIs in Scala, Java, Python, and R and operates efficiently using an in-memory computation model.

Key Advantages of Spark

  • In-Memory Processing: Unlike traditional disk-based processing systems like Hadoop MapReduce, Spark keeps intermediate data in memory, dramatically improving performance.
  • Lazy Evaluation: Spark constructs an execution plan before running computations, minimizing redundant operations.
  • Fault Tolerance: Spark’s Resilient Distributed Dataset (RDD) abstraction ensures data recovery in case of node failures.
  • DataFrame and SQL API: Higher-level abstractions simplify working with structured data, integrating SQL queries directly into Spark workflows.
  • Scalability: Spark can scale across thousands of nodes, making it suitable for enterprise-grade workloads.

Integrating Airflow and DataProc for scalable workflows

Consider a scenario where a machine learning model requires training on a large dataset. The workflow follows these steps:

  1. Cluster Provisioning: A DataProc cluster is created with an appropriate configuration.
  2. Job Execution: A Spark job processes the dataset.
  3. Storage and Cleanup: Processed results are stored in Google Cloud Storage (GCS), and the cluster is shut down.
  4. Monitoring: Airflow tracks job status and failure handling mechanisms.
  5. Automated Scheduling: Recurring workflows can be scheduled for periodic execution.

Example Airflow DAG for DataProc

Below is an example Airflow DAG that automates this workflow:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from airflow import DAG
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateClusterOperator,
    DataprocDeleteClusterOperator,
    DataprocSubmitJobOperator
)
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retries': 1,
}

dag = DAG(
    'dataproc_workflow',
    default_args=default_args,
    description='Example DataProc workflow',
    schedule_interval=None,
)

create_cluster = DataprocCreateClusterOperator(
    task_id='create_cluster',
    project_id='my-project',
    cluster_config={},
    region='us-central1',
    dag=dag,
)

submit_job = DataprocSubmitJobOperator(
    task_id='submit_spark_job',
    job={},
    region='us-central1',
    project_id='my-project',
    dag=dag,
)

delete_cluster = DataprocDeleteClusterOperator(
    task_id='delete_cluster',
    project_id='my-project',
    cluster_name='example-cluster',
    region='us-central1',
    dag=dag,
)

create_cluster >> submit_job >> delete_cluster

Optimization strategies for cost and performance

Efficiently managing DataProc clusters and Airflow workflows can significantly impact cost and performance. Consider the following:

  • Auto-scaling: Dynamically allocate resources based on workload demand.
  • Pre-emptible Instances: Use cost-effective, short-lived VMs to lower computational costs.
  • Persistent vs. Ephemeral Clusters: Persistent clusters reduce spin-up time but may be more expensive if underutilized.
  • Storage Optimization: Compress and clean up unused data to minimize storage costs.
  • Spot Instances: Some cloud providers offer excess compute capacity at reduced prices.
  • Resource Monitoring: Track CPU and memory usage to identify bottlenecks and improve efficiency.

Conclusion

By leveraging Google Cloud Composer (Airflow) and DataProc (Spark), organizations can automate complex data workflows efficiently. The combination of Airflow for orchestration and Spark for distributed computing provides a scalable, cost-effective solution for big data processing.

With optimized configurations and efficient resource allocation, these tools enable businesses to process massive datasets while minimizing costs. Whether for log processing, machine learning training, or ETL pipelines, this setup offers high performance, automation, and scalability.


Understanding environments: shared vs local

Conceptual overview of Airflow and Spark environments

Google Cloud Composer, a managed Apache Airflow, orchestrates workflows using Directed Acyclic Graphs (DAGs). DAGs define a sequence of tasks and their dependencies, ensuring reliable workflow execution.

A typical Composer DAG follows three primary steps:

  1. Provisioning a DataProc cluster
  2. Executing the workload
  3. Tearing down the cluster

This structure ensures efficient resource utilization and cost optimization.

Typical Airflow training job

How Airflow and DataProc work together

A DataProc cluster is a Google Cloud-managed service that transforms Compute Instance resources into an Apache Spark cluster for processing large-scale data. When Airflow provisions a cluster, it effectively creates a secondary execution environment within a dedicated Virtual Private Cloud (VPC).

Relationship between Airflow and DataProc environments

Configuring the DataProc environment

DataProc clusters can be provisioned using:

  • Google Cloud Console
  • Command-line tools
  • Airflow’s DataProcClusterOperator

At cluster creation, several background operations are automatically executed, including:

  • Provisioning head node(s)
  • Installing the Hadoop Distributed File System (HDFS)
  • Configuring YARN for resource scheduling
  • Establishing internal networking

Provisioning worker nodes

Once the Spark cluster is initialized, worker nodes are dynamically provisioned based on the specified configuration. Each worker node:

  1. Pulls the base DataProc image
  2. Starts the Spark service
  3. Runs initialization scripts

This process repeats for each requested worker node.

Worker nodes undergo health checks before registering with head nodes to receive jobs.

Executing PySpark Jobs

Within the cluster, Python code runs as PySpark jobs on worker nodes. Each worker node requires a Python environment, ensuring compatibility with submitted scripts. Once all jobs are processed, Airflow enters the Teardown phase, deallocating resources.

Configuration scope and security considerations

When tuning an Airflow-Spark system, it’s essential to recognize that settings and parameters exist at multiple levels. Each layer—including Airflow, DataProc, Spark, and individual worker nodes—can influence execution performance. Additionally, security policies can be enforced at various levels, affecting deployment strategies and access control.

Conclusion

Understanding shared and local environments within Airflow and DataProc is essential for designing scalable, cost-effective workflows. By correctly configuring clusters, defining resource scopes, and implementing security policies, teams can optimize performance while maintaining a robust orchestration framework.


Passing environment variables in DataProc and Composer

Executive summary

A common design pattern for orchestrating AI and ML pipelines leverages Google Cloud DataProc to execute Spark-based workflows, offering scalable compute resources for distributed data processing.

According to Google’s marketing materials, DataProc offers:

Quote

Serverless deployment, logging, and monitoring let you focus on your data and analytics, not on your infrastructure.

As data practitioners, we may not always have access to all cloud resources, including direct access to DataProc logs, cluster monitoring tools, or SSH access to cluster instances. Security policies or organizational controls often restrict these capabilities, requiring us to rely on orchestration tools like Airflow and alternative logging or monitoring approaches.

A common design pattern for Google Cloud clients is to use Google Cloud Composer, a managed version of Apache Airflow, to orchestrate and schedule Spark jobs. Each Composer environment is linked to a dedicated Google Cloud Storage (GCS) bucket, which is also mounted as a local filesystem on the DataProc cluster when it starts.

Standard workflow

The typical workflow follows these steps:

  1. Initialize a Directed Acyclic Graph (DAG) in Airflow.
  2. The initial node provisions a new Spark cluster.
  3. Subsequent nodes submit PySpark jobs to the cluster.
  4. The final node tears down the cluster.

The Airflow DataProc operators make it possible to orchestrate this entire workflow using Python code that runs within the Composer environment.

Here’s an example DAG that reflects this pattern:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateClusterOperator,
    DataprocDeleteClusterOperator,
    DataprocSubmitJobOperator,
)

with DAG(
    default_args=get_default_args(),
    dag_id="project.env.use_case.inference",
    tags=["Inference"],
) as inference:

    # Create the DataProc cluster
    infra_dataproc_cluster_up = DataprocCreateClusterOperator(
        task_id="infra_dataproc_cluster_up",
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT,
        region=REGION,
        cluster_config=CLUSTER_CONFIG,
        trigger_rule="all_done",
    )

    # ML task
    inference = DataprocSubmitJobOperator(
        task_id=f"infer_{use_case}",
        region=REGION,
        project_id=PROJECT,
        job={
            "placement": {"cluster_name": CLUSTER_NAME},
            "pyspark_job": {
                "main_python_file_uri": f"gs://{BUCKET}/dags/code/predict.py",
            },
        },
        trigger_rule="all_success",
    )

    # Teardown the cluster
    infra_dataproc_cluster_down = DataprocDeleteClusterOperator(
        task_id="infra_dataproc_cluster_down",
        project_id=PROJECT,
        cluster_name=CLUSTER_NAME,
        region=REGION,
        trigger_rule="all_done",
    )

    infra_dataproc_cluster_up >> inference >> infra_dataproc_cluster_down
WARNING - DAG IDs

Airflow DAG IDs can only contain uppercase letters, numbers, and periods. Avoid using underscores (_) or dashes (-) in cluster names, as they can cause silent failures in Google Cloud operations.

The DAG.py file is uploaded to the Composer dags directory in the associated GCS bucket (gs://<BUCKET>/dags). Airflow polls this directory every minute, and new files automatically appear in the UI. When updating a DAG, it’s good practice to confirm that the new code is visible under the Code tab before triggering a run.


Managing composer variables

To enhance security and maintain consistency across environments, many variables are injected into the DAG runtime environment directly from the Composer instance. This allows the DAG to dynamically retrieve these values using either Python’s built-in os.getenv() or Airflow’s airflow.models.Variable class.

To support reproducibility and deployment automation, projects may include a variables.json file in their source repository. This file can be deployed to Composer through several methods.

Deploying variables

Method 1: Deploy to the GCS bucket

Airflow supports automatically importing variables from a variables.json file placed in gs://<BUCKET>/data/. Adding a command to the deployment pipeline, such as .gitlab-ci.yml or .github/workflows/release.yml, ensures the file is copied during deployment:

1
gsutil cp infra/variables.json gs://<BUCKET>/data/variables.json

At present, automatic parsing of this file is restricted, though this may change in higher environments with appropriate approval.

Method 2: Import via command line

The variables can also be directly imported using the gcloud CLI:

1
2
3
4
5
6
gcloud composer environments run \
    <PROJECT> \
    --location=<REGION> \
    variables import \
    -- \
    /home/airflow/gcs/data/variables.json

This approach would require CI/CD runner permissions to authenticate, upload, and apply the variables. Currently, these permissions are not granted to the service account.

Method 3: Manual import via web interface

Variables can also be managed directly through the Airflow UI.

  • Navigate to Admin > Variables.
  • Manually enter key-value pairs or upload the variables.json file directly.
Screenshot of the Airflow variables menu

The uploaded file’s variables will appear in the table.

Screenshot of the Airflow variables screen

Preferred approach

Due to the infrequent nature of variable changes and the current lack of automation permissions, manually uploading variables through the Airflow UI is the preferred method. This also improves operational security, as no sensitive bucket names or regions need to be stored in the source repository.

This approach balances flexibility, security, and operational control while ensuring variables are correctly injected into the Composer environment at runtime.


DataProc cluster configuration

Introduction

In many DataProc projects, jobs are executed under nearly identical conditions — they typically have low memory and CPU requirements and access the same data sources.

To avoid duplicating boilerplate configuration across multiple DAGs, a shared configuration file can be created under the /dag directory. Each environment (e.g., development, UAT, production) can have its own <env>.py file that defines a standard cluster configuration. This shared configuration can then be imported into all relevant DAGs, with project-specific overrides applied when necessary.

Example shared configuration file

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from typing import Any
from airflow.models import Variable
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator
from airflow.utils.dates import days_ago

def get_base_cluster_config() -> ClusterGenerator:
    return ClusterGenerator(
        project_id=Variable.get("PROJECT"),
        region=Variable.get("REGION"),
        master_machine_type="e2-standard-2",
        worker_machine_type="e2-standard-2",
        num_workers=2,
        image_version="2.2-debian12",
        service_account="sa-<project>-<env>@<project>.iam.gserviceaccount.com",
        subnetwork_uri="https://www.googleapis.com/compute/v1/projects/<client>/regions/<region>/subnetworks/snet-<subnet>",
        internal_ip_only=True,
        auto_delete_ttl=1800,
        tags=["<project>"],
    )

def get_default_args() -> dict[str, Any]:
    return {
        "owner": "airflow",
        "depends_on_past": False,
        "schedule_interval": "@daily",
        "start_date": days_ago(1),
        "catchup": False,
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 0,
    }

Number of workers

The minimum number of workers (num_workers) must be greater than 1. DataProc requires at least two workers to form a fully functional cluster.

Clusters configured with only a single worker will pass initial DAG validation and begin provisioning, but will ultimately time out with an unclear error message.


Cluster timeout (auto_delete_ttl)

The auto_delete_ttl parameter is often misunderstood. It defines the total lifetime of the cluster, not an idle timeout.

If this value is set below 10 minutes (600 seconds), the DAG will validate successfully and cluster provisioning will start, but the cluster will silently terminate without producing any meaningful error messages.

Typical startup durations

For small Spark clusters with two standard workers in North American regions, observed provisioning times are:

  • Minimum startup time: Approximately 7 minutes
  • Cold start (e.g., Monday morning): Up to 15 minutes
Package installation is slow

Clusters that install common data science libraries such as NumPy, scikit-learn, and pandas will often exceed 12.5 minutes for startup, even under ideal conditions.

Poorly optimized initialization scripts — especially those with complex or unconstrained pip install commands — can easily push startup times beyond 10 minutes.

Set cluster timeouts high

Despite natural cost-saving instincts, I strongly recommend setting the auto_delete_ttl parameter significantly higher than you might expect — 20 to 30 minutes is a reasonable default.

For DAGs involving model training jobs, you may need to extend the timeout to several hours.

Because all DAGs include a cluster teardown task using trigger_rule="all_done", a generously high timeout does not pose a risk of excessive costs. The cluster will always be cleaned up automatically, even if the job fails or gets stuck.


DAG structure

With the previous configuration steps complete, the DAG itself follows a standard structure:

  1. Pull key variables from the Composer environment, including the environment name, region, and bucket.
  2. Retrieve the default cluster configuration from a shared submodule.
  3. Load default DAG arguments from the shared submodule.
  4. Assemble the DAG: cluster spin-up → PySpark job submission → cluster teardown.

Example DAG structure

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateClusterOperator,
    DataprocDeleteClusterOperator,
    DataprocSubmitJobOperator,
)
from shared.dev import get_base_cluster_config, get_default_args

ENV = Variable.get("ENV")
PROJECT = Variable.get("PROJECT")
REGION = Variable.get("REGION")
BUCKET = Variable.get("BUCKET")

cluster_config = get_base_cluster_config()
cluster_config.metadata = [("ENV", ENV), ("BUCKET", BUCKET)]
CLUSTER_CONFIG = cluster_config.make()
CLUSTER_NAME = f"<project>-{ENV}-<use case>-inf"

with DAG(
    default_args=get_default_args(),
    dag_id="project.env.use_case.inference",
    tags=["Inference"],
) as inference:
    infra_dataproc_cluster_up = DataprocCreateClusterOperator(
        task_id="infra_dataproc_cluster_up",
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT,
        region=REGION,
        cluster_config=CLUSTER_CONFIG,
        trigger_rule="all_done",
    )

    inference = DataprocSubmitJobOperator(
        task_id=f"infer_{use_case}",
        region=REGION,
        project_id=PROJECT,
        job={
            "placement": {"cluster_name": CLUSTER_NAME},
            "pyspark_job": {
                "main_python_file_uri": f"gs://{BUCKET}/dags/code/predict.py",
            },
        },
        trigger_rule="all_success",
    )

    infra_dataproc_cluster_down = DataprocDeleteClusterOperator(
        task_id="infra_dataproc_cluster_down",
        project_id=PROJECT,
        cluster_name=CLUSTER_NAME,
        region=REGION,
        trigger_rule="all_done",
    )

    infra_dataproc_cluster_up >> inference >> infra_dataproc_cluster_down

Important considerations

Cluster name restrictions

As with DAG IDs, cluster names should only contain uppercase letters, numbers, and dashes. Avoid underscores and periods, as they will silently break cluster provisioning.

Minimal dependencies in DAG files

DAG files should avoid direct imports of business logic. Since DAGs are parsed by the Composer environment (a minimal Python environment), adding dependencies requires formal approval and complicates deployment.

Code packaging

Business logic should be packaged as a .whl, .zip, or .tar.gz file and referenced using the python_file_uris argument in the job configuration.


Job submission and argument handling

Since Airflow and Composer operate independently from the DataProc/Spark cluster, relative imports into DAG files are not possible. Instead, all arguments must be explicitly passed in the DataprocSubmitJobOperator.job configuration as a valid DataProc job dictionary.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
job={
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": f"gs://{BUCKET}/dags/code/predict.py",
        "python_file_uris": [PKG],
        "args": [
            "--site", site,
            "--start_date", PARAMS.start_date.strftime("%Y-%m-%d %H:%M"),
            "--end_date", PARAMS.end_date.strftime("%Y-%m-%d %H:%M"),
        ],
    },
}

Reworking existing code

Business logic must be restructured to accept command-line arguments using argparse, rather than relying on imported configurations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
...

def parse_args() -> argparse.Namespace:  
    parser = argparse.ArgumentParser(description="Predict anomalies for XYZ use case.")  

    parser.add_argument("--site", required=True, help="The site location ID.")  
    parser.add_argument("--start_date", required=True, help="The start date to infer over.")  
    parser.add_argument("--end_date", required=True, help="The end date of the inference window.")  

    return parser.parse_args()

def main():
    args = parse_args()
    conf = ConfigStore(use_case=<use case>)
    repository = DataRepository(
        api=conf.api,
        dataset=conf.dataset,
    )

    # Pre-existing entry point
    predict(
        use_case=<use case>,
        site=args.site,
        start_date=datetime.strptime(args.start_date, "%Y-%m-%d %H:%M")
        end_date=datetime.strptime(args.end_date, "%Y-%m-%d %H:%M"),
        repo=repository,
    )


if __name__ == "__main__":
    main()

DataProc cluster startup optimization

Introduction

Cluster startup time is a crucial factor in DataProc job efficiency. When a DataProc cluster is provisioned, worker nodes are allocated from a Compute Instance pool. The provisioning time depends on machine type availability in the compute region. Larger memory machines typically take longer to provision due to lower availability in the data center.

Once provisioned, workers automatically apply the DataProc image, which includes Java, Scala (for Spark), and Python 3.11 with Conda. However, no additional libraries are installed, meaning all project dependencies must be configured before executing any job.

To streamline the startup process, the initialization script follows three key steps:

  1. OS package update (apt-get)
  2. Install OS binaries (e.g., required system dependencies)
  3. Install Python requirements

Optimizing dependency installation

There are multiple ways to install Python dependencies within a DataProc runtime. The choice of method significantly impacts cluster startup time.

The key options include:

  • Installing from a requirements file
  • Installing from a built Python package
  • Using Poetry for dependency resolution

Option 1: Installing from a requirements file

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
#!/bin/bash  

# Install required OS libraries  
apt-get update -y  
apt-get install --yes make coinor-cbc coinor-libcbc-dev libgomp1  

# Variables  
PACKAGE_BUCKET="gs://<bucket>/data"  
LOCAL_PACKAGE_DIR="/opt/app"  

# Copy the requirements file from GCS  
gsutil cp $PACKAGE_BUCKET/requirements.txt ./requirements.txt  

# Install dependencies  
pip install -r requirements.txt

Option 2: Installing from a .whl Package

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#!/bin/bash  

# Install required OS libraries  
apt-get update -y  
apt-get install --yes make coinor-cbc coinor-libcbc-dev libgomp1  

# Variables  
PACKAGE_BUCKET="gs://<bucket>/data"  
LATEST_PACKAGE=$(gsutil ls "$PACKAGE_BUCKET/<project>-*-py3-none-any.whl" | tail -1)   
LOCAL_DIR="/opt/app"  

# Copy the package from GCS  
mkdir -p $LOCAL_DIR  
gsutil cp $LATEST_PACKAGE $LOCAL_DIR    

# Install dependencies  
cd $LOCAL_DIR  
ls -1 <project>-*-py3-none-any.whl | xargs pip install

Final optimization strategy

Based on extensive testing, the most efficient approach was determined to be:

1
2
3
4
5
6
7
8
#!/bin/bash  

# Variables  
REQUIREMENTS="gs://<bucket>/data/conf/requirements.txt"  

# Copy and install dependencies  
gsutil cp $REQUIREMENTS ./requirements.txt  
pip install -r requirements.txt
Note

The COIN-OR installation could be removed as a version update to the data science libraries meant that they were builded inside of the Python libraries.

Key optimizations

  1. Dropped apt-get updates: Eliminating OS package updates significantly reduces startup time (~3 minutes saved)
  2. Explicit Dependency Versioning: Pre-resolving dependency versions avoids unnecessary resolution during installation.
  3. Using pip Over poetry or conda: pip provides the fastest installation for pre-versioned dependencies.

Python package management for DataProc

When submitting PySpark jobs, dependencies must be packaged for the Spark environment. Unlike cluster-wide dependencies installed via startup scripts, these packages are defined at the job level.

Five methods for building a Python package

Option 1) Poetry build

With no changes to the project, call poetry to build a wheel using the pyproject.toml file.

1
poetry build -f wheel

A requirements file is still required for cluster setup. To export the dependencies:

1
2
3
4
5
poetry export \
    -f requirements.txt \
    --with data \
    --output requirements.txt \
    --without-hashes

Here we use --with to select the dependency groups that are required.

Important

The --without-hashes is important to build a requirements file that is OS agnostic. Otherwise a requirements file built on a Mac will not be able to resolve any matching dependencies on a Linux based CI pipeline.

Option 2) Use Python’s built-in tools

Create a basic setup.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import os  
from setuptools import find_packages, setup  

def read_requirements():  
    req_path = os.path.join(os.path.dirname(__file__), "requirements.txt")  
    if os.path.exists(req_path):  
        with open(req_path) as f:  
            return f.read().splitlines()  
    return []  

setup(  
    name="<project>",  
    version="0.1.0",  
    install_requires=read_requirements(),  
    packages=find_packages(),  
)

With the setup file we can then build.

2a) Old method (setuptools)

The traditional way of building a Python package is with the built-in setuptools library. This is largely discouraged, and will show warnings on the command line when executed.

From the command line call

1
python setup.py sdist

To include the requirements file in the bundle (required for installation) requires adding the requirements file to a MANIFEST.in file (also at project root).

1
include requirements.txt

2b) Modern python (build)

The preferred Pythonic way of building a package is using the built-in build package.

1
python -m build

This builds both

  • <project>-0.1.0.tar.gz
  • <project>-0.1.0-py3-none-any.whl

Wheels do not include the dependencies, only references for installation.

Tip

Wheels are better for fast installation and when there are frozen dependencies (such as a release on a production pipeline)

Option 3) Zipping the source

DataProc supports Python code simply zipped into a .zip file. However, that .zip file still requires a top-level setup.py or pyproject.toml to instruct the package manager on how to handle the install.

In this project, the pyproject.toml references the README.md file, so it also needs to be included in the bundle.

3a) Zipping with the pyproject.toml

1
2
3
4
5
zip -r \
    dist/<project>-0.1.0-py3-none-any.zip \
    <project or src>/ \
    pyproject.toml \
    README.md
Note

The README.md needs to be included in the bundle as the pyproject.toml makes reference to it.

3b) Zipping with the setup.py

1
2
3
4
zip -r \
    dist/<project>-0.1.0-py3-none-any.zip \
    <project or src>/ \
    setup.py

Bundling the dependencies in the package

Unfortunately the nature of Python is that a project inevitably ends up with a lot of dependencies. By default pip will install these sequentially, by querying an archive server, then downloading the package, caching it, then installing it - all over the open internet. If you are not careful with explicitly versioning each dependency pip will also try to rebuild the entire dependency tree upon install and will slow down the startup by as much as 15 minutes per run.

In a release situation, where the dependencies are frozen and clusters are frequently spun up, it can make sense to include the dependencies within the package. The (larger) package is then copied once from Cloud Storage to the Composer file system once and everything is installed without any calls to the internet.

The only complication is that the bundled dependencies need to exactly match the target run time. i.e. building a package on a Mac will pull Mac compiled dependencies and will not run on a Linux server.

Pull the Docker image

Jump into a (Debian) Linux based Docker image, ideally one that matches the Python version on the DataProc cluster

1
docker pull python:3.11-slim

With the image downloaded, create and connect to an interactive session with the image. Note to connect with bash and not the Python command interface

1
docker run -it --rm -v $(pwd):/app -w /app python:3.11-slim bash

Where,

  • $(pwd):/app: Mounts your current directory (pwd) to /app in the container.
  • -w /app: Sets the working directory inside the container to /app.

Install dependencies to a target directory

From the container, install the requirements and save the cache

1
pip install -r requirements.txt --target ./vendor

This will then pull the exact requirement binary versions for Linux and store them in the vendor directory.

Again, we zip the project code, this time include the dependencies

1
2
3
4
5
6
zip -r \
    dist/<project>-0.1.0-py3-none-any.zip
    <project or src>/ \
    vendor/ \       # include dependencies
    pyproject.toml \
    README.md
Warning

For this project the ZIP size went from ~80KB to 468MB with the usual data science libraries.

However, this should save a lot of time on installation, as there is no resolution required from pip, and there should be no network latency.


Benchmarking cluster initialization

To determine the optimal approach, a benchmarking exercise was conducted to compare different formats, dependency resolutions, and build systems.

Benchmark results

FormatVersioned DependenciesInclude DependenciesBuild SystemConfigBuild SizeDataProc Init Time
.tar.gzNoNoPoetrypyproject.toml80 KB25+ min
.whlNoNoPoetrypyproject.toml80 KB7 min
.whlYesNoPoetrypyproject.toml80 KB5 min
.zipNoNoPoetry .whl → zippyproject.toml80 KB25+ min
.zipNoYesPoetry .whl → zippyproject.toml468 MB22 min
.zipNoNoPoetry .whl → zipsetup.py80 KB12 min
.whlNoNoPython sdistsetup.py80 KB9 min
.whlNoNoPython buildsetup.py80 KB9 min
.zipYesNoZippyproject.toml80 KB25+ min
.zipYesYesZippyproject.toml468 MB17 min
.zipYesNoZipsetup.py80 KB12 min
.zipYesYesZipsetup.py468 MB5 min

Key takeaways

1) .whl is the most performant file format

.whl’s were the most performant.

.zip’s were the second best. Curiously, taking a .whl file and renaming it to a .zip almost doubled the cluster initialization.

.tar.gz was by far the worst format and more than 5x the install time before the cluster timed out.

2) Versioned dependencies vastly increase install speed

Explicit versioning saved approx. 30% of cluster initialization time across file formats.

Of course, the version resolution needs to be added to the deployment pipeline - but this only runs once per release, but saves resolution every time a job is submitted.

3) Bundling the dependencies yields no real benefit

It was theorized that, as GCS and DataProc would be in the same data centre, moving a larger package internally would be faster than looking up and pulling ~100 libraries across the open internet. This was not the case. It might be that the DataProc service is providing a certain amount of caching of common libraries, or that pip can multithread the fetch and install of packages from the internet, but not locally.

4) pyproject.toml performs slightly better than setup.py

Not a significant difference, but avoiding a legacy build system that is warned against and will be shortly deprecated makes sense - further it would pollute the project top level directory with more files.

5) Modern build systems produce more performant builds

At first glance, the packages built by sdist, build and poetry appear identical - however, there are slight differences which affect their handling by the package manager at install. The new systems install slightly faster.

6) Larger jobs are more performant than increasing parallelization

Given that the package install has to occur every time a job is submitted to a Spark cluster worker, care should be taken to balance number of jobs with job size. From this investigation, increasing a job length by 5 minutes would be far more beneficial than creating a new job to do the same work.

For this reason, larger worker nodes utilizing a multithread approach would likely yield the best outcomes.


Passing runtime variables in DataProc and Airflow

Introduction

Using environment variables to pass secrets and configurations at runtime is a DevOps best practice that enhances security, flexibility, and maintainability. Hard-coding sensitive values in source code increases the risk of leaks, while environment variables keep secrets out of version control systems. This also enables dynamic configuration, allowing jobs to adapt across different environments (e.g., DEV, UAT, PRD) without modifying the application code.

In DataProc, variables can be set within Google Cloud Composer and injected into DataProc worker environments, allowing job scripts to retrieve these values dynamically.


Methods for passing runtime variables

Apache Spark provides multiple ways to pass variables to the runtime environment. The following are the four primary approaches, listed in order of preference.

OS-level initialization script (cluster startup)

Google Cloud’s recommended method is defining a startup script during cluster creation. This script is stored in Cloud Storage and executed as an initialization action.

Example command

1
2
3
4
5
gcloud dataproc clusters create <NAME> \
    --region=${REGION} \
    --initialization-actions=gs://<bucket>/startup.sh \
    --initialization-action-timeout=10m \
    ... other flags ...

Using this method, environment variables can be appended to the system’s /etc/environment file which is run as the final initialization step for every machine that is added to the cluster.

Example startup script

1
2
3
#!/usr/bin/env bash

echo "FOO=BAR" >> /etc/environment

Compute engine metadata (instance-level variables)

Since DataProc clusters are built on Google Compute Engine (GCE) instances, metadata can be passed to both head and worker nodes at time of provisioning from Google management layer.

Example command

1
2
3
gcloud dataproc clusters \
    create <NAME> \
    --metadata foo=BAR,startup-script-url=gs://<bucket>/startup.sh

Metadata can also be used for Hadoop and Spark properties:

1
2
3
gcloud dataproc clusters \
    create <NAME> \
    --properties hadoop-env:FOO=hello,spark-env:BAR=world

Spark properties (job submission-level variables)

If the environment variables need to be job-specific, they can be injected directly into the Spark runtime environment when submitting a job.

Example command

1
2
3
gcloud dataproc jobs \
    submit spark \
    --properties spark.executorEnv.FOO=world

From Airflow, the DataprocSubmitJobOperator supports passing job-level properties:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
DataprocSubmitJobOperator(
    task_id="example_task",
    region=REGION,
    project_id=PROJECT,
    job={
        "placement": {"cluster_name": CLUSTER_NAME},
        "pyspark_job": {
            "main_python_file_uri": f"gs://{BUCKET}/dags/predict.py",
            "python_file_uris": [PKG],
            "args": [
                "--store_key", store,
                "--start_date", PARAMS.start_date.strftime("%Y-%m-%d %H:%M"),
                "--end_date", PARAMS.end_date.strftime("%Y-%m-%d %H:%M"),
            ],
            "properties": {
                "ENV": ENV,
                "PROJECT": PROJECT,
                "REGION": REGION,
                "BUCKET": BUCKET,
            },
        },
    },
)
Warning

Some security policies may block custom environment variables from being passed using this method.


Command-line arguments (explicit variable passing)

If other methods fail due to security restrictions, environment variables can be passed explicitly as command-line arguments.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
DataprocSubmitJobOperator(
    task_id="example_task",
    region=REGION,
    project_id=PROJECT,
    job={
        "placement": {"cluster_name": CLUSTER_NAME},
        "pyspark_job": {
            "main_python_file_uri": f"gs://{BUCKET}/dags/predict.py",
            "python_file_uris": [PKG],
            "args": [
                "--site", site,
                "--start_date", PARAMS.start_date.strftime("%Y-%m-%d %H:%M"),
                "--end_date", PARAMS.end_date.strftime("%Y-%m-%d %H:%M"),
                "--env",  # NEW ARGUMENT ADDED
                ENV,
            ],
        },
    },
)

This requires modifying the entry point of the run script to accept the new argument:

1
parser.add_argument("--env", required=False, default="local", help="Runtime environment.")

Use a secrets manager

The most secure way to manage runtime variables is to store sensitive values in a secrets manager rather than passing them directly.

Why Use a Secrets Manager?

  • Security: Keeps secrets out of logs, DAGs, and environment variables.
  • Access Control: Secrets can be role-restricted to prevent unauthorized access.
  • Versioning: Allows tracking changes to secrets over time.
  • Auditing: Provides logging to track access attempts.
  • Ease of coding: The same variables can be used across deployment environments, so long as each environment has its own Secret Manager.

Google Cloud Secret Manager

Google Cloud Secret Manager provides centralized, access-controlled storage for secrets. All we need to do is add Secret Manager access to the deployment Service Account and then we can replace most variables with a simple secret lookup.

1) Store a Secret

1
gcloud secrets create MY_SECRET --replication-policy="automatic"

2) Set a value

1
echo -n "my_secret_value" | gcloud secrets versions add MY_SECRET --data-file=-

3) Retrieve a secret from Airflow

In Airflow DAGs, secrets can be accessed using Google Secret Manager Hook:

1
2
3
4
5
6
7
from airflow.providers.google.cloud.hooks.secret_manager import SecretManagerHook

def get_secret(secret_name: str):
    hook = SecretManagerHook()
    return hook.get_secret(secret_name)

MY_SECRET_VALUE = get_secret("MY_SECRET")

4) Retrieve a Secret from DataProc

Alternatively, secrets can be accessed directly from within the DataProc environment using the Google Cloud Secret Manager Python client.

Ensure that the Secret Manager library is included in the project requirements and installed during project initialization

Then in your PySpark job, retrieve the Secret

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from google.cloud import secretmanager

def get_secret(secret_name: str, project: str) -> str:
    """Retrieve a secret from Google Cloud Secret Manager."""
    client = secretmanager.SecretManagerServiceClient()
    secret_path = f"projects/{project}/secrets/{secret_name}/versions/latest"
    response = client.access_secret_version(request={"name": secret_path})
    return response.payload.data.decode("UTF-8")

# Example usage
SECRET_NAME = "MY_SECRET"

secret_value = get_secret(SECRET_NAME, PROJECT)

This method has the advantage of only pulling secrets when they are needed at runtime rather than at DAG execution time - this limits their scope to a single function or class rather than the entire execution environment.


Conclusion

When configuring runtime variables in DataProc and Airflow, the best method depends on security policies and operational requirements:

MethodUse Case
OS-Level Startup ScriptBest for cluster-wide persistent variables
Compute Engine MetadataWorks at the instance level; useful for per-node settings
Spark PropertiesBest for job-specific runtime variables
Command-Line ArgumentsA fallback option when security policies restrict variable injection
Secrets ManagerThe most secure option for sensitive values

For sensitive data such as API keys, database credentials, and encryption secrets, Google Cloud Secret Manager should always be used.

This structured approach ensures secure and flexible runtime configuration across different environments.

What distinguishes you from other developers?

I've built data pipelines across 3 continents at petabyte scales, for over 15 years. But the data doesn't matter if we don't solve the human problems first - an AI solution that nobody uses is worthless.

Are the robots going to kill us all?

Not any time soon. At least not in the way that you've got imagined thanks to the Terminator movies. Sure somebody with a DARPA grant is always going to strap a knife/gun/flamethrower on the side of a robot - but just like in Dr.Who - right now, that robot will struggle to even get out of the room, let alone up some stairs.

But AI is going to steal my job, right?

A year ago, the whole world was convinced that AI was going to steal their job. Now, the reality is that most people are thinking 'I wish this POC at work would go a bit faster to scan these PDFs'.

When am I going to get my self-driving car?

Humans are complicated. If we invented driving today - there's NO WAY IN HELL we'd let humans do it. They get distracted. They text their friends. They drink. They make mistakes. But the reality is, all of our streets, cities (and even legal systems) have been built around these limitations. It would be surprisingly easy to build self-driving cars if there were no humans on the road. But today no one wants to take liability. If a self-driving company kills someone, who's responsible? The manufacturer? The insurance company? The software developer?