DataProc - Cluster configuration

An explainer on how to best set up Google Cloud DataProc environments for Python AI and ML workflows.

This post is part of a comprehensive series to DataProc and Airflow. You can read everything in this one monster piece, or you can jump to a particular section

  1. What is DataProc? And why do we need it?
  2. Environments, what’s shared and what’s local
  3. Environment variables
  4. Cluster configuration
  5. Cluster startup optimization
  6. Runtime variables

On this page


DataProc cluster configuration

Introduction

In many DataProc projects, jobs are executed under nearly identical conditions — they typically have low memory and CPU requirements and access the same data sources.

To avoid duplicating boilerplate configuration across multiple DAGs, a shared configuration file can be created under the /dag directory. Each environment (e.g., development, UAT, production) can have its own <env>.py file that defines a standard cluster configuration. This shared configuration can then be imported into all relevant DAGs, with project-specific overrides applied when necessary.

Example shared configuration file

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from typing import Any
from airflow.models import Variable
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator
from airflow.utils.dates import days_ago

def get_base_cluster_config() -> ClusterGenerator:
    return ClusterGenerator(
        project_id=Variable.get("PROJECT"),
        region=Variable.get("REGION"),
        master_machine_type="e2-standard-2",
        worker_machine_type="e2-standard-2",
        num_workers=2,
        image_version="2.2-debian12",
        service_account="sa-<project>-<env>@<project>.iam.gserviceaccount.com",
        subnetwork_uri="https://www.googleapis.com/compute/v1/projects/<client>/regions/<region>/subnetworks/snet-<subnet>",
        internal_ip_only=True,
        auto_delete_ttl=1800,
        tags=["<project>"],
    )

def get_default_args() -> dict[str, Any]:
    return {
        "owner": "airflow",
        "depends_on_past": False,
        "schedule_interval": "@daily",
        "start_date": days_ago(1),
        "catchup": False,
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 0,
    }

Number of workers

The minimum number of workers (num_workers) must be greater than 1. DataProc requires at least two workers to form a fully functional cluster.

Clusters configured with only a single worker will pass initial DAG validation and begin provisioning, but will ultimately time out with an unclear error message.


Cluster timeout (auto_delete_ttl)

The auto_delete_ttl parameter is often misunderstood. It defines the total lifetime of the cluster, not an idle timeout.

If this value is set below 10 minutes (600 seconds), the DAG will validate successfully and cluster provisioning will start, but the cluster will silently terminate without producing any meaningful error messages.

Typical startup durations

For small Spark clusters with two standard workers in North American regions, observed provisioning times are:

  • Minimum startup time: Approximately 7 minutes
  • Cold start (e.g., Monday morning): Up to 15 minutes
Package installation is slow

Clusters that install common data science libraries such as NumPy, scikit-learn, and pandas will often exceed 12.5 minutes for startup, even under ideal conditions.

Poorly optimized initialization scripts — especially those with complex or unconstrained pip install commands — can easily push startup times beyond 10 minutes.

Set cluster timeouts high

Despite natural cost-saving instincts, I strongly recommend setting the auto_delete_ttl parameter significantly higher than you might expect — 20 to 30 minutes is a reasonable default.

For DAGs involving model training jobs, you may need to extend the timeout to several hours.

Because all DAGs include a cluster teardown task using trigger_rule="all_done", a generously high timeout does not pose a risk of excessive costs. The cluster will always be cleaned up automatically, even if the job fails or gets stuck.


DAG structure

With the previous configuration steps complete, the DAG itself follows a standard structure:

  1. Pull key variables from the Composer environment, including the environment name, region, and bucket.
  2. Retrieve the default cluster configuration from a shared submodule.
  3. Load default DAG arguments from the shared submodule.
  4. Assemble the DAG: cluster spin-up → PySpark job submission → cluster teardown.

Example DAG structure

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateClusterOperator,
    DataprocDeleteClusterOperator,
    DataprocSubmitJobOperator,
)
from shared.dev import get_base_cluster_config, get_default_args

ENV = Variable.get("ENV")
PROJECT = Variable.get("PROJECT")
REGION = Variable.get("REGION")
BUCKET = Variable.get("BUCKET")

cluster_config = get_base_cluster_config()
cluster_config.metadata = [("ENV", ENV), ("BUCKET", BUCKET)]
CLUSTER_CONFIG = cluster_config.make()
CLUSTER_NAME = f"<project>-{ENV}-<use case>-inf"

with DAG(
    default_args=get_default_args(),
    dag_id="project.env.use_case.inference",
    tags=["Inference"],
) as inference:
    infra_dataproc_cluster_up = DataprocCreateClusterOperator(
        task_id="infra_dataproc_cluster_up",
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT,
        region=REGION,
        cluster_config=CLUSTER_CONFIG,
        trigger_rule="all_done",
    )

    inference = DataprocSubmitJobOperator(
        task_id=f"infer_{use_case}",
        region=REGION,
        project_id=PROJECT,
        job={
            "placement": {"cluster_name": CLUSTER_NAME},
            "pyspark_job": {
                "main_python_file_uri": f"gs://{BUCKET}/dags/code/predict.py",
            },
        },
        trigger_rule="all_success",
    )

    infra_dataproc_cluster_down = DataprocDeleteClusterOperator(
        task_id="infra_dataproc_cluster_down",
        project_id=PROJECT,
        cluster_name=CLUSTER_NAME,
        region=REGION,
        trigger_rule="all_done",
    )

    infra_dataproc_cluster_up >> inference >> infra_dataproc_cluster_down

Important considerations

Cluster name restrictions

As with DAG IDs, cluster names should only contain uppercase letters, numbers, and dashes. Avoid underscores and periods, as they will silently break cluster provisioning.

Minimal dependencies in DAG files

DAG files should avoid direct imports of business logic. Since DAGs are parsed by the Composer environment (a minimal Python environment), adding dependencies requires formal approval and complicates deployment.

Code packaging

Business logic should be packaged as a .whl, .zip, or .tar.gz file and referenced using the python_file_uris argument in the job configuration.


Job submission and argument handling

Since Airflow and Composer operate independently from the DataProc/Spark cluster, relative imports into DAG files are not possible. Instead, all arguments must be explicitly passed in the DataprocSubmitJobOperator.job configuration as a valid DataProc job dictionary.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
job={
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": f"gs://{BUCKET}/dags/code/predict.py",
        "python_file_uris": [PKG],
        "args": [
            "--site", site,
            "--start_date", PARAMS.start_date.strftime("%Y-%m-%d %H:%M"),
            "--end_date", PARAMS.end_date.strftime("%Y-%m-%d %H:%M"),
        ],
    },
}

Reworking existing code

Business logic must be restructured to accept command-line arguments using argparse, rather than relying on imported configurations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
...

def parse_args() -> argparse.Namespace:  
    parser = argparse.ArgumentParser(description="Predict anomalies for XYZ use case.")  

    parser.add_argument("--site", required=True, help="The site location ID.")  
    parser.add_argument("--start_date", required=True, help="The start date to infer over.")  
    parser.add_argument("--end_date", required=True, help="The end date of the inference window.")  

    return parser.parse_args()

def main():
    args = parse_args()
    conf = ConfigStore(use_case=<use case>)
    repository = DataRepository(
        api=conf.api,
        dataset=conf.dataset,
    )

    # Pre-existing entry point
    predict(
        use_case=<use case>,
        site=args.site,
        start_date=datetime.strptime(args.start_date, "%Y-%m-%d %H:%M")
        end_date=datetime.strptime(args.end_date, "%Y-%m-%d %H:%M"),
        repo=repository,
    )


if __name__ == "__main__":
    main()

How do you define successful engineering leadership?

The Philosophy

Many view technical leadership as being the “smartest architect in the room.” I see it as the opposite. My job is to build a room where I don’t have to be the smartest person because the systems, culture, and communication are so robust that the team can out-innovate me.

The Strategy

  • Alignment: Does every engineer understand how their sprint task impacts the company’s bottom line?
  • Velocity vs. Stability: We aren’t just “shipping fast”; we are building a predictable, repeatable engine that doesn’t collapse under its own weight at the next order of magnitude.
  • The Human Growth Curve: Success is when the engineering team’s capability evolves faster than the product’s complexity. If the team feels stagnant, the tech stack will soon follow.

What is your approach to scaling technical organizations?

The Philosophy

Scaling isn’t just “hiring more people” - that’s often how you slow down. Scaling is about moving from Individual Heroics to Organizational Systems.

The Strategy

  • The 3-Continent Perspective: Having managed global teams, I focus on “High-Signal Communication.” As you grow, the cost of a meeting triples. I implement “Asynchronous-First” cultures that protect deep-work time while ensuring no one is blocked by a timezone.

  • Modular Autonomy: I advocate for breaking down monolithic teams into autonomous units with clear ownership. This reduces the “communication tax” and allows us to scale the headcount without scaling the bureaucracy.

  • Automation as Infrastructure: At petabyte scale, manual intervention is a failure. I treat the developer experience (CI/CD, observability, self-service infra) as a first-class product to keep the “path to production” frictionless.

How do you balance high-growth velocity with technical stability?

The Philosophy

Technical debt isn’t a “bad thing” to be avoided; it’s a set of historical decisions that no longer serve you. Like any loan, leverage can accelerate growth when investments payoff. But if velocity and returns are slowing you need a payment plan before the interest kills you.

The Strategy

  • The ROI Filter: I don’t refactor for the sake of “clean code.” I don’t refactor a micro-service with no users. I refactor when the pain on that debt - measured in bugs, downtime, or developer frustration - starts to exceed the cost of the fix.

  • Zero-Downtime Culture: Especially at scale, stability is a feature. I implement “Guardrail Engineering” where the system is designed to fail gracefully, ensuring that a Series B growth spike becomes a success story rather than a post-mortem.

  • The 70/20/10 Rule: I typically aim to dedicate 70% of resources to new features, 20% to infrastructure/debt, and 10% to R&D. This ensures we never stop innovating, but we never stop fortifying either.