DataProc - Cluster configuration

An explainer on how to best set up Google Cloud DataProc environments for Python AI and ML workflows.

This post is part of a comprehensive series to DataProc and Airflow. You can read everything in this one monster piece, or you can jump to a particular section

  1. What is DataProc? And why do we need it?
  2. Environments, what’s shared and what’s local
  3. Environment variables
  4. Cluster configuration
  5. Cluster startup optimization
  6. Runtime variables

On this page


DataProc cluster configuration

Introduction

In many DataProc projects, jobs are executed under nearly identical conditions — they typically have low memory and CPU requirements and access the same data sources.

To avoid duplicating boilerplate configuration across multiple DAGs, a shared configuration file can be created under the /dag directory. Each environment (e.g., development, UAT, production) can have its own <env>.py file that defines a standard cluster configuration. This shared configuration can then be imported into all relevant DAGs, with project-specific overrides applied when necessary.

Example shared configuration file

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from typing import Any
from airflow.models import Variable
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator
from airflow.utils.dates import days_ago

def get_base_cluster_config() -> ClusterGenerator:
    return ClusterGenerator(
        project_id=Variable.get("PROJECT"),
        region=Variable.get("REGION"),
        master_machine_type="e2-standard-2",
        worker_machine_type="e2-standard-2",
        num_workers=2,
        image_version="2.2-debian12",
        service_account="sa-<project>-<env>@<project>.iam.gserviceaccount.com",
        subnetwork_uri="https://www.googleapis.com/compute/v1/projects/<client>/regions/<region>/subnetworks/snet-<subnet>",
        internal_ip_only=True,
        auto_delete_ttl=1800,
        tags=["<project>"],
    )

def get_default_args() -> dict[str, Any]:
    return {
        "owner": "airflow",
        "depends_on_past": False,
        "schedule_interval": "@daily",
        "start_date": days_ago(1),
        "catchup": False,
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 0,
    }

Number of workers

The minimum number of workers (num_workers) must be greater than 1. DataProc requires at least two workers to form a fully functional cluster.

Clusters configured with only a single worker will pass initial DAG validation and begin provisioning, but will ultimately time out with an unclear error message.


Cluster timeout (auto_delete_ttl)

The auto_delete_ttl parameter is often misunderstood. It defines the total lifetime of the cluster, not an idle timeout.

If this value is set below 10 minutes (600 seconds), the DAG will validate successfully and cluster provisioning will start, but the cluster will silently terminate without producing any meaningful error messages.

Typical startup durations

For small Spark clusters with two standard workers in North American regions, observed provisioning times are:

  • Minimum startup time: Approximately 7 minutes
  • Cold start (e.g., Monday morning): Up to 15 minutes
Package installation is slow

Clusters that install common data science libraries such as NumPy, scikit-learn, and pandas will often exceed 12.5 minutes for startup, even under ideal conditions.

Poorly optimized initialization scripts — especially those with complex or unconstrained pip install commands — can easily push startup times beyond 10 minutes.

Set cluster timeouts high

Despite natural cost-saving instincts, I strongly recommend setting the auto_delete_ttl parameter significantly higher than you might expect — 20 to 30 minutes is a reasonable default.

For DAGs involving model training jobs, you may need to extend the timeout to several hours.

Because all DAGs include a cluster teardown task using trigger_rule="all_done", a generously high timeout does not pose a risk of excessive costs. The cluster will always be cleaned up automatically, even if the job fails or gets stuck.


DAG structure

With the previous configuration steps complete, the DAG itself follows a standard structure:

  1. Pull key variables from the Composer environment, including the environment name, region, and bucket.
  2. Retrieve the default cluster configuration from a shared submodule.
  3. Load default DAG arguments from the shared submodule.
  4. Assemble the DAG: cluster spin-up → PySpark job submission → cluster teardown.

Example DAG structure

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateClusterOperator,
    DataprocDeleteClusterOperator,
    DataprocSubmitJobOperator,
)
from shared.dev import get_base_cluster_config, get_default_args

ENV = Variable.get("ENV")
PROJECT = Variable.get("PROJECT")
REGION = Variable.get("REGION")
BUCKET = Variable.get("BUCKET")

cluster_config = get_base_cluster_config()
cluster_config.metadata = [("ENV", ENV), ("BUCKET", BUCKET)]
CLUSTER_CONFIG = cluster_config.make()
CLUSTER_NAME = f"<project>-{ENV}-<use case>-inf"

with DAG(
    default_args=get_default_args(),
    dag_id="project.env.use_case.inference",
    tags=["Inference"],
) as inference:
    infra_dataproc_cluster_up = DataprocCreateClusterOperator(
        task_id="infra_dataproc_cluster_up",
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT,
        region=REGION,
        cluster_config=CLUSTER_CONFIG,
        trigger_rule="all_done",
    )

    inference = DataprocSubmitJobOperator(
        task_id=f"infer_{use_case}",
        region=REGION,
        project_id=PROJECT,
        job={
            "placement": {"cluster_name": CLUSTER_NAME},
            "pyspark_job": {
                "main_python_file_uri": f"gs://{BUCKET}/dags/code/predict.py",
            },
        },
        trigger_rule="all_success",
    )

    infra_dataproc_cluster_down = DataprocDeleteClusterOperator(
        task_id="infra_dataproc_cluster_down",
        project_id=PROJECT,
        cluster_name=CLUSTER_NAME,
        region=REGION,
        trigger_rule="all_done",
    )

    infra_dataproc_cluster_up >> inference >> infra_dataproc_cluster_down

Important considerations

Cluster name restrictions

As with DAG IDs, cluster names should only contain uppercase letters, numbers, and dashes. Avoid underscores and periods, as they will silently break cluster provisioning.

Minimal dependencies in DAG files

DAG files should avoid direct imports of business logic. Since DAGs are parsed by the Composer environment (a minimal Python environment), adding dependencies requires formal approval and complicates deployment.

Code packaging

Business logic should be packaged as a .whl, .zip, or .tar.gz file and referenced using the python_file_uris argument in the job configuration.


Job submission and argument handling

Since Airflow and Composer operate independently from the DataProc/Spark cluster, relative imports into DAG files are not possible. Instead, all arguments must be explicitly passed in the DataprocSubmitJobOperator.job configuration as a valid DataProc job dictionary.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
job={
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": f"gs://{BUCKET}/dags/code/predict.py",
        "python_file_uris": [PKG],
        "args": [
            "--site", site,
            "--start_date", PARAMS.start_date.strftime("%Y-%m-%d %H:%M"),
            "--end_date", PARAMS.end_date.strftime("%Y-%m-%d %H:%M"),
        ],
    },
}

Reworking existing code

Business logic must be restructured to accept command-line arguments using argparse, rather than relying on imported configurations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
...

def parse_args() -> argparse.Namespace:  
    parser = argparse.ArgumentParser(description="Predict anomalies for XYZ use case.")  

    parser.add_argument("--site", required=True, help="The site location ID.")  
    parser.add_argument("--start_date", required=True, help="The start date to infer over.")  
    parser.add_argument("--end_date", required=True, help="The end date of the inference window.")  

    return parser.parse_args()

def main():
    args = parse_args()
    conf = ConfigStore(use_case=<use case>)
    repository = DataRepository(
        api=conf.api,
        dataset=conf.dataset,
    )

    # Pre-existing entry point
    predict(
        use_case=<use case>,
        site=args.site,
        start_date=datetime.strptime(args.start_date, "%Y-%m-%d %H:%M")
        end_date=datetime.strptime(args.end_date, "%Y-%m-%d %H:%M"),
        repo=repository,
    )


if __name__ == "__main__":
    main()

What distinguishes you from other developers?

I've built data pipelines across 3 continents at petabyte scales, for over 15 years. But the data doesn't matter if we don't solve the human problems first - an AI solution that nobody uses is worthless.

Are the robots going to kill us all?

Not any time soon. At least not in the way that you've got imagined thanks to the Terminator movies. Sure somebody with a DARPA grant is always going to strap a knife/gun/flamethrower on the side of a robot - but just like in Dr.Who - right now, that robot will struggle to even get out of the room, let alone up some stairs.

But AI is going to steal my job, right?

A year ago, the whole world was convinced that AI was going to steal their job. Now, the reality is that most people are thinking 'I wish this POC at work would go a bit faster to scan these PDFs'.

When am I going to get my self-driving car?

Humans are complicated. If we invented driving today - there's NO WAY IN HELL we'd let humans do it. They get distracted. They text their friends. They drink. They make mistakes. But the reality is, all of our streets, cities (and even legal systems) have been built around these limitations. It would be surprisingly easy to build self-driving cars if there were no humans on the road. But today no one wants to take liability. If a self-driving company kills someone, who's responsible? The manufacturer? The insurance company? The software developer?