DataProc - Variables

An explainer on how variables can be inserted into the Google Cloud Composer and DataProc environments.

This post is part of a comprehensive series to DataProc and Airflow. You can read everything in this one monster piece, or you can jump to a particular section

  1. What is DataProc? And why do we need it?
  2. Environments, what’s shared and what’s local
  3. Environment variables
  4. Cluster configuration
  5. Cluster startup optimization
  6. Runtime variables

On this page


Passing environment variables in DataProc and Composer

Executive summary

A common design pattern for orchestrating AI and ML pipelines leverages Google Cloud DataProc to execute Spark-based workflows, offering scalable compute resources for distributed data processing.

According to Google’s marketing materials, DataProc offers:

Quote

Serverless deployment, logging, and monitoring let you focus on your data and analytics, not on your infrastructure.

As data practitioners, we may not always have access to all cloud resources, including direct access to DataProc logs, cluster monitoring tools, or SSH access to cluster instances. Security policies or organizational controls often restrict these capabilities, requiring us to rely on orchestration tools like Airflow and alternative logging or monitoring approaches.

A common design pattern for Google Cloud clients is to use Google Cloud Composer, a managed version of Apache Airflow, to orchestrate and schedule Spark jobs. Each Composer environment is linked to a dedicated Google Cloud Storage (GCS) bucket, which is also mounted as a local filesystem on the DataProc cluster when it starts.

Standard workflow

The typical workflow follows these steps:

  1. Initialize a Directed Acyclic Graph (DAG) in Airflow.
  2. The initial node provisions a new Spark cluster.
  3. Subsequent nodes submit PySpark jobs to the cluster.
  4. The final node tears down the cluster.

The Airflow DataProc operators make it possible to orchestrate this entire workflow using Python code that runs within the Composer environment.

Here’s an example DAG that reflects this pattern:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateClusterOperator,
    DataprocDeleteClusterOperator,
    DataprocSubmitJobOperator,
)

with DAG(
    default_args=get_default_args(),
    dag_id="project.env.use_case.inference",
    tags=["Inference"],
) as inference:

    # Create the DataProc cluster
    infra_dataproc_cluster_up = DataprocCreateClusterOperator(
        task_id="infra_dataproc_cluster_up",
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT,
        region=REGION,
        cluster_config=CLUSTER_CONFIG,
        trigger_rule="all_done",
    )

    # ML task
    inference = DataprocSubmitJobOperator(
        task_id=f"infer_{use_case}",
        region=REGION,
        project_id=PROJECT,
        job={
            "placement": {"cluster_name": CLUSTER_NAME},
            "pyspark_job": {
                "main_python_file_uri": f"gs://{BUCKET}/dags/code/predict.py",
            },
        },
        trigger_rule="all_success",
    )

    # Teardown the cluster
    infra_dataproc_cluster_down = DataprocDeleteClusterOperator(
        task_id="infra_dataproc_cluster_down",
        project_id=PROJECT,
        cluster_name=CLUSTER_NAME,
        region=REGION,
        trigger_rule="all_done",
    )

    infra_dataproc_cluster_up >> inference >> infra_dataproc_cluster_down
WARNING - DAG IDs

Airflow DAG IDs can only contain uppercase letters, numbers, and periods. Avoid using underscores (_) or dashes (-) in cluster names, as they can cause silent failures in Google Cloud operations.

The DAG.py file is uploaded to the Composer dags directory in the associated GCS bucket (gs://<BUCKET>/dags). Airflow polls this directory every minute, and new files automatically appear in the UI. When updating a DAG, it’s good practice to confirm that the new code is visible under the Code tab before triggering a run.


Managing composer variables

To enhance security and maintain consistency across environments, many variables are injected into the DAG runtime environment directly from the Composer instance. This allows the DAG to dynamically retrieve these values using either Python’s built-in os.getenv() or Airflow’s airflow.models.Variable class.

To support reproducibility and deployment automation, projects may include a variables.json file in their source repository. This file can be deployed to Composer through several methods.

Deploying variables

Method 1: Deploy to the GCS bucket

Airflow supports automatically importing variables from a variables.json file placed in gs://<BUCKET>/data/. Adding a command to the deployment pipeline, such as .gitlab-ci.yml or .github/workflows/release.yml, ensures the file is copied during deployment:

1
gsutil cp infra/variables.json gs://<BUCKET>/data/variables.json

At present, automatic parsing of this file is restricted, though this may change in higher environments with appropriate approval.

Method 2: Import via command line

The variables can also be directly imported using the gcloud CLI:

1
2
3
4
5
6
gcloud composer environments run \
    <PROJECT> \
    --location=<REGION> \
    variables import \
    -- \
    /home/airflow/gcs/data/variables.json

This approach would require CI/CD runner permissions to authenticate, upload, and apply the variables. Currently, these permissions are not granted to the service account.

Method 3: Manual import via web interface

Variables can also be managed directly through the Airflow UI.

  • Navigate to Admin > Variables.
  • Manually enter key-value pairs or upload the variables.json file directly.
Screenshot of the Airflow variables menu

The uploaded file’s variables will appear in the table.

Screenshot of the Airflow variables screen

Preferred approach

Due to the infrequent nature of variable changes and the current lack of automation permissions, manually uploading variables through the Airflow UI is the preferred method. This also improves operational security, as no sensitive bucket names or regions need to be stored in the source repository.

This approach balances flexibility, security, and operational control while ensuring variables are correctly injected into the Composer environment at runtime.

What distinguishes you from other developers?

I've built data pipelines across 3 continents at petabyte scales, for over 15 years. But the data doesn't matter if we don't solve the human problems first - an AI solution that nobody uses is worthless.

Are the robots going to kill us all?

Not any time soon. At least not in the way that you've got imagined thanks to the Terminator movies. Sure somebody with a DARPA grant is always going to strap a knife/gun/flamethrower on the side of a robot - but just like in Dr.Who - right now, that robot will struggle to even get out of the room, let alone up some stairs.

But AI is going to steal my job, right?

A year ago, the whole world was convinced that AI was going to steal their job. Now, the reality is that most people are thinking 'I wish this POC at work would go a bit faster to scan these PDFs'.

When am I going to get my self-driving car?

Humans are complicated. If we invented driving today - there's NO WAY IN HELL we'd let humans do it. They get distracted. They text their friends. They drink. They make mistakes. But the reality is, all of our streets, cities (and even legal systems) have been built around these limitations. It would be surprisingly easy to build self-driving cars if there were no humans on the road. But today no one wants to take liability. If a self-driving company kills someone, who's responsible? The manufacturer? The insurance company? The software developer?