menu
arrow_back
Back

Cloud Composer: Copying BigQuery Tables Across Different Locations

—/100

Checkpoints

arrow_forward

Create Cloud Composer environment.

Create two Cloud Storage buckets.

Create a dataset.

Uploading the DAG and dependencies to Cloud Storage

Cloud Composer: Copying BigQuery Tables Across Different Locations

1 hour Free

GSP283

Google Cloud Self-Paced Labs

Overview

In this advanced lab, you will learn how to create and run an Apache Airflow workflow in Cloud Composer that completes the following tasks:

  • Reads from a config file the list of tables to copy
  • Exports the list of tables from a BigQuery dataset located in US to Cloud Storage
  • Copies the exported tables from US to EU Cloud Storage buckets
  • Imports the list of tables into the target BigQuery Dataset in EU

cce6bf21555543ce.png

Setup

Before you click the Start Lab button

Read these instructions. Labs are timed and you cannot pause them. The timer, which starts when you click Start Lab, shows how long Google Cloud resources will be made available to you.

This Qwiklabs hands-on lab lets you do the lab activities yourself in a real cloud environment, not in a simulation or demo environment. It does so by giving you new, temporary credentials that you use to sign in and access Google Cloud for the duration of the lab.

What you need

To complete this lab, you need:

  • Access to a standard internet browser (Chrome browser recommended).
  • Time to complete the lab.

Note: If you already have your own personal Google Cloud account or project, do not use it for this lab.

Note: If you are using a Pixelbook, open an Incognito window to run this lab.

How to start your lab and sign in to the Google Cloud Console

  1. Click the Start Lab button. If you need to pay for the lab, a pop-up opens for you to select your payment method. On the left is a panel populated with the temporary credentials that you must use for this lab.

    Open Google Console

  2. Copy the username, and then click Open Google Console. The lab spins up resources, and then opens another tab that shows the Sign in page.

    Sign in

    Tip: Open the tabs in separate windows, side-by-side.

  3. In the Sign in page, paste the username that you copied from the Connection Details panel. Then copy and paste the password.

    Important: You must use the credentials from the Connection Details panel. Do not use your Qwiklabs credentials. If you have your own Google Cloud account, do not use it for this lab (avoids incurring charges).

  4. Click through the subsequent pages:

    • Accept the terms and conditions.
    • Do not add recovery options or two-factor authentication (because this is a temporary account).
    • Do not sign up for free trials.

After a few moments, the Cloud Console opens in this tab.

Activate Cloud Shell

Cloud Shell is a virtual machine that is loaded with development tools. It offers a persistent 5GB home directory and runs on the Google Cloud. Cloud Shell provides command-line access to your Google Cloud resources.

In the Cloud Console, in the top right toolbar, click the Activate Cloud Shell button.

Cloud Shell icon

Click Continue.

cloudshell_continue.png

It takes a few moments to provision and connect to the environment. When you are connected, you are already authenticated, and the project is set to your PROJECT_ID. For example:

Cloud Shell Terminal

gcloud is the command-line tool for Google Cloud. It comes pre-installed on Cloud Shell and supports tab-completion.

You can list the active account name with this command:

gcloud auth list

(Output)

Credentialed accounts:
 - <myaccount>@<mydomain>.com (active)

(Example output)

Credentialed accounts:
 - google1623327_student@qwiklabs.net

You can list the project ID with this command:

gcloud config list project

(Output)

[core]
project = <project_ID>

(Example output)

[core]
project = qwiklabs-gcp-44776a13dea667a6

Check project permissions

Before you begin your work on Google Cloud, you need to ensure that your project has the correct permissions within Identity and Access Management (IAM).

  1. In the Google Cloud console, on the Navigation menu (nav-menu.png), click IAM & Admin > IAM.

  2. Confirm that the default compute Service Account {project-number}[email protected] is present and has the editor role assigned. The account prefix is the project number, which you can find on Navigation menu > Home.

check-sa.png

If the account is not present in IAM or does not have the editor role, follow the steps below to assign the required role.

  • In the Google Cloud console, on the Navigation menu, click Home.

  • Copy the project number (e.g. 729328892908).

  • On the Navigation menu, click IAM & Admin > IAM.

  • At the top of the IAM page, click Add.

  • For New members, type:

{project-number}[email protected]

Replace {project-number} with your project number.

  • For Role, select Project (or Basic) > Editor. Click Save.

add-sa.png

Create Cloud Composer environment

First, create a Cloud Composer environment by clicking on Composer in the Navigation menu:

composer.png

Then click Create environment.

Set the following parameters for your environment:

  • Name: composer-advanced-lab
  • Location: us-central1
  • Zone: us-central1-a

Leave all other settings as default. Click Create.

The environment creation process is completed when the green checkmark displays to the left of the environment name on the Environments page in the Cloud Console.

It can take up to 20 minutes for the environment to complete the setup process. Move on to the next section - Create Cloud Storage buckets and BigQuery dataset.

Click Check my progress to verify the objective.

Create Cloud Composer environment.

Create Cloud Storage buckets

Create two Cloud Storage Multi-Regional buckets. Give your two buckets a universally unique name including the location as a suffix:

  • one located in the US as source (e.g. 6552634-us)
  • the other located in EU as destination (e.g. 6552634-eu)

These buckets will be used to copy the exported tables across locations, i.e., US to EU.

Click Check my progress to verify the objective.

Create two Cloud Storage buckets.

BigQuery destination dataset

Create the destination BigQuery Dataset in EU from the BigQuery new web UI.

Go to Navigation menu > BigQuery.

The Welcome to BigQuery in the Cloud Console message box opens. This message box provides a link to the quickstart guide and lists UI updates.

Click Done.

Then click on your Qwiklabs project ID:

create_dataset.png

Click Create Dataset. Use the name nyc_tlc_EU and Data location EU.

nyc_tlc_eu_dataset.png

Click Create dataset.

Click Check my progress to verify the objective.

Create a dataset.

Airflow and core concepts, a brief introduction

While your environment is building, read about the sample file you'll be using in this lab.

Airflow is a platform to programmatically author, schedule and monitor workflows.

Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies.

Core concepts

DAG - A Directed Acyclic Graph is a collection of tasks, organised to reflect their relationships and dependencies.

Operator - The description of a single task, it is usually atomic. For example, the BashOperator is used to execute bash command.

Task - A parameterised instance of an Operator; a node in the DAG.

Task Instance - A specific run of a task; characterised as: a DAG, a Task, and a point in time. It has an indicative state: running, success, failed, skipped, ...

The rest of the Airflow concepts can be found here.

Defining the workflow

Cloud Composer workflows are comprised of DAGs (Directed Acyclic Graphs). The code shown in bq_copy_across_locations.py is the workflow code, also referred to as the DAG. Open the file now to see how it is built. Next will be a detailed look at some of the key components of the file.

To orchestrate all the workflow tasks, the DAG imports the following operators:

  1. DummyOperator: Creates Start and End dummy tasks for better visual representation of the DAG.
  2. BigQueryToCloudStorageOperator: Exports BigQuery tables to Cloud Storage buckets using Avro format.
  3. GoogleCloudStorageToGoogleCloudStorageOperator: Copies files across Cloud Storage buckets.
  4. GoogleCloudStorageToBigQueryOperator: Imports tables from Avro files in Cloud Storage bucket.

In this example, the function read_master_file() is defined to read the config file and build the list of tables to copy.

# --------------------------------------------------------------------------------
# Functions
# --------------------------------------------------------------------------------

def read_table_list(table_list_file):
    """
    Reads the master CSV file that will help in creating Airflow tasks in
    the DAG dynamically.
    :param table_list_file: (String) The file location of the master file,
    e.g. '/home/airflow/framework/master.csv'
    :return master_record_all: (List) List of Python dictionaries containing
    the information for a single row in master CSV file.
    """
    master_record_all = []
    logger.info('Reading table_list_file from : %s' % str(table_list_file))
    try:
        with open(table_list_file, 'rb') as csv_file:
            csv_reader = csv.reader(csv_file)
            next(csv_reader)  # skip the headers
            for row in csv_reader:
                logger.info(row)
                master_record = {
                    'table_source': row[0],
                    'table_dest': row[1]
                }
                master_record_all.append(master_record)
            return master_record_all
    except IOError as e:
        logger.error('Error opening table_list_file %s: ' % str(
            table_list_file), e)

The name of the DAG is bq_copy_us_to_eu_01, and the DAG is not scheduled by default so needs to be triggered manually.

default_args = {
    'owner': 'airflow',
    'start_date': datetime.today(),
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
# DAG object.
with models.DAG('bq_copy_us_to_eu_01',
                default_args=default_args,
                schedule_interval=None) as dag:

To define the Cloud Storage plugin, the class Cloud StoragePlugin(AirflowPlugin) is defined, mapping the hook and operator downloaded from the Airflow 1.10-stable branch.

"""
    GCS Plugin
    This plugin provides an interface to GCS operator from Airflow Master.
"""

from airflow.plugins_manager import AirflowPlugin

from gcs_plugin.hooks.gcs_hook import GoogleCloudStorageHook
from gcs_plugin.operators.gcs_to_gcs import \
    GoogleCloudStorageToGoogleCloudStorageOperator

class GCSPlugin(AirflowPlugin):
    name = "gcs_plugin"
    operators = [GoogleCloudStorageToGoogleCloudStorageOperator]
    hooks = [GoogleCloudStorageHook]

Viewing environment information

Go back to Composer to check on the status of your environment.

Once your environment has been created, click the name of the environment to see its details.

The Environment details page provides information, such as the Airflow web UI URL, Google Kubernetes Engine cluster ID, name of the Cloud Storage bucket connected to the DAGs folder.

instance-creation.png

Creating a virtual environment

Execute the following command to download and update the packages list.

sudo apt-get update

Python virtual environments are used to isolate package installation from the system.

sudo apt-get install virtualenv
If prompted [Y/n], press Y and then Enter.
virtualenv -p python3 venv

Activate the virtual environment.

source venv/bin/activate

Setting DAGs Cloud Storage bucket

In Cloud Shell, run the following to copy the name of the DAGs bucket from your Environment Details page and set a variable to refer to it in Cloud Shell:

Make sure to replace your DAGs bucket name in the following command. Navigate to Navigation menu > Storage, it will be similar to us-central1-composer-advanc-YOURDAGSBUCKET-bucket.
DAGS_BUCKET=us-central1-composer-advanc-YOURDAGSBUCKET-bucket

You will be using this variable a few times during the lab.

Setting Airflow variables

Airflow variables are an Airflow-specific concept that is distinct from environment variables. In this step, you'll set the following three Airflow variables used by the DAG we will deploy: table_list_file_path, gcs_source_bucket, and gcs_dest_bucket.

KEY

VALUE

Details

table_list_file_path

/home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv

CSV file listing source and target tables, including dataset

gcs_source_bucket

{UNIQUE ID}-us

Cloud Storage bucket to use for exporting BigQuery tabledest_bbucks from source

gcs_dest_bucket

{UNIQUE ID}-eu

Cloud Storage bucket to use for importing BigQuery tables at destination

The next gcloud composer command executes the Airflow CLI sub-command variables. The sub-command passes the arguments to the gcloud command line tool.

To set the three variables, you will run the composer command once for each row from the above table. The form of the command is this:

gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION variables -- \
--set KEY VALUE
  • ENVIRONMENT_NAME is the name of the environment.
  • LOCATION is the Compute Engine region where the environment is located. The gcloud composer command requires including the --location flag or setting the default location before running the gcloud command.
  • KEY and VALUE specify the variable and its value to set. Include a space two dashes space ( -- ) between the left-side gcloud command with gcloud-related arguments and the right-side Airflow sub-command-related arguments. Also include a space between the KEY and VALUE arguments. using the gcloud composer environments run command with the variables sub-command in

For example, the gcs_source_bucket variable would be set like this:

gcloud composer environments run composer-advanced-lab \
--location us-central1 variables -- \
--set gcs_source_bucket My_Bucket-us

To see the value of a variable, run the Airflow CLI sub-command variables with the get argument or use the Airflow UI.

For example, run the following:

gcloud composer environments run composer-advanced-lab \
    --location us-central1 variables -- \
    --get gcs_source_bucket
Note: Make sure to set all three Airflow variables used by the DAG.

Uploading the DAG and dependencies to Cloud Storage

  1. Copy the Google Cloud Python docs samples files into your Cloud shell:

cd ~
gsutil -m cp -r gs://spls/gsp283/python-docs-samples .
  1. Upload a copy of the third party hook and operator to the plugins folder of your Composer DAGs Cloud Storage bucket:

gsutil cp -r python-docs-samples/third_party/apache-airflow/plugins/* gs://$DAGS_BUCKET/plugins
  1. Next, upload the DAG and config file to the DAGs Cloud Storage bucket of your environment:

gsutil cp python-docs-samples/composer/workflows/bq_copy_across_locations.py gs://$DAGS_BUCKET/dags
gsutil cp python-docs-samples/composer/workflows/bq_copy_eu_to_us_sample.csv gs://$DAGS_BUCKET/dags

Cloud Composer registers the DAG in your Airflow environment automatically, and DAG changes occur within 3-5 minutes. You can see task status in the Airflow web interface and confirm the DAG is not scheduled as per the settings.

Using the Airflow UI

To access the Airflow web interface using the Cloud Console:

  1. Go back to the Composer Environments page.
  2. In the Airflow webserver column for the environment, click the new window icon.

395f17840778e0dd.png

  1. Click on your lab credentials.

  2. The Airflow web UI opens in a new browser window. Data will still be loading when you get here. You can continue with the lab while this is happening.

Viewing Variables

The variables you set earlier are persisted in your environment. You can view the variables by selecting Admin >Variables from the Airflow menu bar.

38b43061012ab930.png

Trigger the DAG to run manually

Click on the DAGs tab and wait for the links to finish loading.

To trigger the DAG manually, click the play button for composer_sample_bq_copy_across_locations :

1a05036b9227748.png

Click Trigger to confirm this action.

Click Check my progress to verify the objective.

Uploading the DAG and dependencies to Cloud Storage

Exploring DAG runs

When you upload your DAG file to the DAGs folder in Cloud Storage, Cloud Composer parses the file. If no errors are found, the name of the workflow appears in the DAG listing, and the workflow is queued to run immediately if the schedule conditions are met, in this case, None as per the settings.

The DAG Runs status turns green once the play button is pressed:

8965437cacab8604.png

Click the name of the DAG to open the DAG details page. This page includes a graphical representation of workflow tasks and dependencies.

4a7bdf344674e2e8.png

Now, in the toolbar, click Graph View, then mouseover the graphic for each task to see its status. Note that the border around each task also indicates the status (green border = running; red = failed, etc.).

c8c7c51f9f0cac63.png

To run the workflow again from the Graph View:

  1. In the Airflow UI Graph View, click the start graphic.
  2. Click Clear to reset all the tasks and then click OK to confirm.

b977538c5a7fa192.png

Refresh your browser while the process is running to see the most recent information.

Validate the results

Now check the status and results of the workflow by going to these Cloud Console pages:

  • The exported tables were copied from the US bucket to the EU Cloud Storage bucket. Click on Storage to see the intermediate Avro files in the source (US) and destination (EU) buckets.
  • The list of tables were imported into the target BigQuery Dataset. Click on BigQuery, then click on your project name and the nyc_tlc_EU dataset to validate the tables are accessible from the dataset you created.

2e3c9292a44f5968.png

Congratulations!

You've have completed this advanced lab and copied 2 tables programmatically from US to EU! This lab is based on this blog post by David Sabater Dinter.

Take Your Next Lab

Continue your Quest with Predict Visitor Purchases with a Classification Model in BQML, or check out these suggestions:

Next steps

Google Cloud Training & Certification

...helps you make the most of Google Cloud technologies. Our classes include technical skills and best practices to help you get up to speed quickly and continue your learning journey. We offer fundamental to advanced level training, with on-demand, live, and virtual options to suit your busy schedule. Certifications help you validate and prove your skill and expertise in Google Cloud technologies.

Manual Last Updated March 23, 2021
Lab Last Tested March 23, 2021

Copyright 2021 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.