Published on
β€’ 7 min read

Python Beam and DataFlow to write data to BigQuery with a custom Docker image

Authors

Dataflow build and run

Basic example of how to develop an Apache Beam pipeline written in Python that writes data to BigQuery. To then build it as a flex template and then run it as a Dataflow job.

This example avoids complexities of reading from source systems or storage and simply creates synthetic data.

Working from these examples:

Setup

Reference code is in the following Github repo https://github.com/mortie23/beam-jdbc-testing/tree/master/docker.

The GCP prerequisites are in the following sections.

Project

Firstly, the basic GCP project naming convention we'll use will include an environment.

prj-<org>-<env>-<business-unit>

In this example, the Organisation (<org>) is xyz, and the Business unit (<business-unit>) is fruit

The project requires:

GCP resource
GCP Artifact registry, Docker repository
DataFlow API
Cloud storage
BigQuery

IAM

  • Service account (gsvc-xyz-<env>-fruit@<project_id>.iam.gserviceaccount.com) with roles
    • Artifact Registry Repository Administrator
    • Storage Object Admin
    • Dataflow Worker
    • Workflows Invoker
    • BigQuery custom role (permissions)
      • bigquery.tables.create
      • bigquery.tables.get
      • bigquery.tables.update
      • bigquery.tables.updateData

Cloud storage

Cloud storage bucket (bkt-xyz-<env>-fruit) with sub folders

πŸ“ dataflow/
β”‚   β”œβ”€β”€ πŸ“ flex-template/
β”‚   β”œβ”€β”€ πŸ“ staging/
β”‚   β”œβ”€β”€ πŸ“ logs/
β”‚   β”œβ”€β”€ πŸ“ temp/

BigQuery

Make sure the target table within the appropriate dataset (fruit) exists.

create table fruit.hellofruit (
  name string
  , test_number int64
)

Build

The build step involves building a Docker container image, and associated Flex template file.

The Docker file:

  1. starts FROM the base image
  2. sets the required environment (ENV) variables
  3. COPY the source files to the Docker image
  4. RUN operating system updates and installs of packages as well as the Python packages
  5. defines the ENTRYPOINT
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/hellofruit.py"

COPY . /template

# We could get rid of installing libffi-dev and git, or we could leave them.
RUN apt-get update \
    && apt-get install -y libffi-dev git \
    && rm -rf /var/lib/apt/lists/* \
    # Upgrade pip and install the requirements.
    && pip install --no-cache-dir --upgrade pip \
    && pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE \
    # Download the requirements to speed up launching the Dataflow job.
    && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE

# Since we already downloaded all the dependencies, there's no need to rebuild everything.
ENV PIP_NO_DEPS=True

ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]

Options

Using the build script

The build script, will run all steps for you and do parameter replacement from the config file:

The config file sets out the parameters for the DataFlow job.

project_id: 'prj-xyz-<env>-fruit'
artifact_registry: 'australia-southeast1-docker.pkg.dev'
artifact_repo_docker: 'rpo-xyz-<env>-fruit-dkr'
storage_bucket: 'bkt-xyz-<env>-fruit'
service_account: 'gsvc-xyz-<env>-fruit'
location: 'australia-southeast1'
./dataflow-build.sh --env dev

Manual step by step

This is manually performing the steps from the bash script previously mentioned.

Build the Docker image on your local client development machine, tagging the image with the full reference to the GCP Artifact registry repository.

docker build . -t australia-southeast1-docker.pkg.dev/<project_id>/<artifact_repository_docker>/dataflow/hellofruit:0.1

The build step will take quite a bit of time, it has to pull the base image, copy the scripts and then install the dependencies etc.

Before you try to push the container image to the Artifact registry repository you need to authenticate your gcloud CLI with the artifact registry.

gcloud auth configure-docker australia-southeast1-docker.pkg.dev

Push the image to the registry.

docker push australia-southeast1-docker.pkg.dev/<project_id>/<artifact_repository_docker>/dataflow/hellofruit:0.1

Now we can build a flex template.

gcloud dataflow flex-template build "gs://<storage_bucket>/dataflow/flex-template/hellofruit.json" \
     --image "australia-southeast1-docker.pkg.dev/<project-id>/<artifact_repository_docker>/dataflow/hellofruit:0.1" \
     --sdk-language "PYTHON" \
     --metadata-file "metadata.json"

This should run quite quickly and result in a JSON file in the bucket location you requested.

Successfully saved container spec in flex template file.
Template File GCS Location: gs://<storage_bucket>/dataflow/flex-template/hellofruit.json
Container Spec:

{
    "defaultEnvironment": {},
    "image": "australia-southeast1-docker.pkg.dev/<project-id>/<artifact_repository_docker>/dataflow/hellofruit:0.1",
    "metadata": {
        "description": "Hello fruit Python flex template.",
        "name": "Hello fruit",
        "parameters": [
            {
                "helpText": "Name of the BigQuery output table name.",
                "isOptional": true,
                "label": "BigQuery output table name.",
                "name": "output_table",
                "regexes": [
                    "([^:]+:)?[^.]+[.].+"
                ]
            }
        ]
    },
    "sdkInfo": {
        "language": "PYTHON"
    }
}

Using the gcloud CLI

This method will do the build and push as well as create the flex template JSON file. However in certain GCP setups it may not run. Some of the magic it does requires certain permissions that may not be granted.

gcloud dataflow flex-template build "gs://<storage_bucket>/dataflow/flex/hellofruit.json" \
     --image-gcr-path "australia-southeast1-docker.pkg.dev/<project_id>/<artifact_repository_docker>/dataflow/hellofruit:latest" \
     --staging-location "gs://<storage_bucket>/dataflow/staging" \
     --temp-location "gs://<storage_bucket>/dataflow/temp" \
     --gcs-log-dir "gs://<storage_bucket>/dataflow/logs" \
     --sdk-language "PYTHON" \
     --flex-template-base-image "PYTHON3" \
     --metadata-file "metadata.json" \
     --py-path "." \
     --env "FLEX_TEMPLATE_PYTHON_PY_FILE=hellofruit.py" \
     --env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt" \
     --log-http \
     --verbosity debug
Example error log

These are example logs that you might see if this fails for the reason such as you do not have storage bucket create access. Building it this way uses GCP Cloud Run which will attempt to create a temporary bucket with the naming convention <project_id>_cloudbuild

DEBUG: Running [gcloud.dataflow.flex-template.build]

uri: https://storage.googleapis.com/storage/v1/b/<project_id>_cloudbuild?alt=json
DEBUG: Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG: https://storage.googleapis.com:443 "GET /storage/v1/b/<project_id>_cloudbuild?alt=json HTTP/1.1" 404 247

{
  "error": {
    "code": 404,
    "message": "The specified bucket does not exist.",
    "errors": [
      {
        "message": "The specified bucket does not exist.",
        "domain": "global",
        "reason": "notFound"
      }
    ]
  }
}

uri: https://storage.googleapis.com/storage/v1/b?alt=json&enableObjectRetention=False&project=<project_id>
DEBUG: Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG: https://storage.googleapis.com:443 "POST /storage/v1/b?alt=json&enableObjectRetention=False&project=<project_id> HTTP/1.1" 403 546
{
  "error": {
    "code": 403,
    "message": "<user>@<domain> does not have storage.buckets.create access to the Google Cloud project. Permission 'storage.buckets.create' denied on resource (or it may not exist).",
    "errors": [
      {
        "message": "<user>@<domain> does not have storage.buckets.create access to the Google Cloud project. Permission 'storage.buckets.create' denied on resource (or it may not exist).",
        "domain": "global",
        "reason": "forbidden"
      }
    ]
  }
}

DEBUG: (gcloud.dataflow.flex-template.build) The user is forbidden from accessing the bucket [<project_id>_cloudbuild]. Please check your organization's policy or if the user has the "serviceusage.services.use" permission. Giving the user Owner, Editor, or Viewer roles may also fix this issue. Alternatively, use the --no-source option and access your source code via a different method.

googlecloudsdk.command_lib.builds.submit_util.BucketForbiddenError: The user is forbidden from accessing the bucket [<project_id>_cloudbuild]. Please check your organization's policy or if the user has the "serviceusage.services.use" permission. Giving the user Owner, Editor, or Viewer roles may also fix this issue. Alternatively, use the --no-source option and access your source code via a different method.
ERROR: (gcloud.dataflow.flex-template.build) The user is forbidden from accessing the bucket [<project_id>_cloudbuild]. Please check your organization's policy or if the user has the "serviceusage.services.use" permission. Giving the user Owner, Editor, or Viewer roles may also fix this issue. Alternatively, use the --no-source option and access your source code via a different method.

Run your custom flex template

Running a Dataflow job using a built flex template and Docker container

Options

Using the Run script

Like the build script, the run script will run all steps for you and do parameter replacement from the config file.

./dataflow-run.sh --env dev --name hellofruit

Dataflow UI

Using the Dataflow user interface you can use the Create a job from template and browse to Custom template JSON that was created in the build step.

Given that our metadata.json configuration contained a parameter of output_table the first optional parameter is Output table.

Results

When the DataFlow job runs successfully you should see a Job Graph like this:

Checking the target BigQuery table shows successful rows written to the table (this is a result of multiple successful runs).

Notes

I personally found the development and debugging experience for this process very atypical. Nothing seemed to run smoothly and there were many error messages that were very difficult to debug. Lack of community support and lack of Stack Overflow questions and answers was definitely noticeable compared to other coding frameworks.

One example I had was that I had it all working one day, and the next day it stopped working with the following error message. I found a post that suggested this could be many reasons, and one suggestion was to change the location. I changed the location to a US location and it worked. However this might not be an option for all users.

Failed to start the VM, launcher-, used for launching because of
status code: UNAVAILABLE,
reason: One or more operations had an error:
  'operation----': [UNAVAILABLE] 'HTTP_503'..