Batch Pipelines And How To Build Them Using Cloud Data Fusion

Batch Pipelines And How To Build Them Using Cloud Data Fusion

In this article I will cover how to build batch pipelines and deploy them on google cloud using Cloud Data Fusion. First of all let's talk about the different types of pipeline and why you need them in your life in the first place.

Well Pipelines are a way to move big data from point A to point B. And while you do this commute, you can process this data as well.

And we have different structures of pipelines and you can choose between them based on your business case. First we have the famous "ETL" and the "E" is for extract, And that's when we extract the data from source. And the "T" is for transform and that's when we do processing. And finally the "L" is for load and that's when we load the data to target.

If you need to move your data to target right now and the transformation is minor then you'll have an "ELT". In this case you are going to process data on target right away.

And if you have some luck on your side you'll use an "EL". In this case your row data is clean to begin with and you'll only build a pipeline to just move it from source to target.

After you decide on the pipeline structure you want to use, you can pick the type. Basically you have two types. Batch Pipeline and Stream Pipeline. The difference between them is really simple. Batch is a type of pipeline to use when you want to process data in batches.

Imagine if you have a commercial website and you wanted to get some data like: product reviews, clicks per product and some other data to do processing on them to get insights on the products. Or maybe this data will be a training dataset to your machine learning product recommendation system.

In this case you don't need a real-time stream of data. You will only need a scheduled update to your target data sink every end of day. Unlike Stream Pipeline, You'll need a Real-Time streaming river of data and while you have this HUGE amount of data you need to process it as well.

For example, Uber uses data stream processing because simply we move faster than data some times so batch processing would not be beneficial.

Now, Let's talk about implementation.

Google cloud gives us a really broad selection on how to build these pipelines. Here I will cover how to build batch pipelines using Cloud Data Fusion.

What is Cloud Data Fusion?

Cloud Data Fusion allows you to build scalable data integration solutions without having to manage the infrastructure.

Step 1: Create a Cloud Data Fusion instance

  • Open your account on GCP and check if you have the Fusion API enabled. If not, On the search bar type "APIs & Services" then choose "Enable APIs and Services". Then search for "Fusion API" and click "Enable". Then type in the search bar "Data Fusion" and click on "Create an Instance".
No alt text provided for this image
  • Enter a name for your instance and make it descriptive. Building the instance will take around 15 minutes so you can get a cup of coffee first.
  • Congratulations your very first new data fusion instance is now available. You'll need to grant the instance permissions on your running project. Navigate to the instance detail page by clicking the instance name.
No alt text provided for this image
  • Copy the service account. Also, In the GCP Console navigate to the IAM & Admin >> IAM.
  • On the IAM Permissions page, add the service account you copied earlier as a new member and grant the Cloud Data Fusion API Service Agent role, by clicking the Add button. And don't forget to save.
No alt text provided for this image

Step 2: Loading the data

Well, Now after you have your Cloud Data Fusion instance up and running. You need to create your data source. And that's simply a Google Cloud Bucket. And while we at it copy our online dataset as well.

  • Open the cloud shell console on the top right corner on the screen and execute the following commands.
export BUCKET=$GOOGLE_CLOUD_PROJECT
gsutil mb gs://$BUCKET
gsutil cp gs://cloud-training/OCBL017/ny-taxi-2018-sample.csv gs://$BUCKET
  • Now you created a bucket with the name of your project id. Also, create another one with "-temp" at the end using the following command.
gsutil mb gs://$BUCKET-temp
  • On the Cloud Data Fusion instances page. Click on "View Instance".

Introducing Wrangler: is an interactive, visual tool that lets you see the effects of transformations on a small subset of your data before dispatching large, parallel-processing jobs on the entire dataset.

  • Choose Wrangler. On the left side is a panel. Under Google Cloud Storage, select Cloud Storage Default. Click on the bucket corresponding to your project name. Select ny-taxi-2018-sample.csv. The data is loaded into the Wrangler screen in row/column form.
No alt text provided for this image

Step 3: Cleaning the data

Now you'll do some simple transformations on your dataset according to your business need.

  • In my taxi dataset for example. I have trip_distance column values with negative values so these values must be filtered before processing. Here I’ll simply add a condition.
No alt text provided for this image

Step 4: Creating the pipeline

Cloud Data Fusion translates your visually built pipeline into an Apache Spark or MapReduce program that executes transformations on an ephemeral Cloud Dataproc cluster in parallel. This enables you to easily execute complex transformations over vast quantities of data in a scalable manner, without having to worry about infrastructure.

Introducing Dataproc: Dataproc is a managed Spark and Hadoop service for batch processing, querying, streaming, and machine learning. Dataproc automation helps you create clusters quickly, manage them easily.

  • On the upper-right side of the Google Cloud Fusion UI, click Create a Pipeline. Then select Batch pipeline.
No alt text provided for this image
  • Now you'll see two nodes. GCSFile source node connected into a Wrangler node. Wrangler node contains all the transformations you applied.
No alt text provided for this image

Step 5: Adding a data source

Now the taxi dataset that I am using contains several columns such as pickup_location_id, That isn't quite transparent. So I’m going to add a data source to the pipeline that maps pickup_location_id to a relevant location name that can be easily understood. The mapping info here will be stored in a BigQuery.

 Introducing BigQuery: Is a cloud data warehouse that is serverless, highly scalable and designed for business agility. Very reliable tool for analyzing petabytes of data at blazing-fast speeds.

  • Open separate tab and in the search bar type “BigQuery
  • Select your Project ID and from the left pane, and then create new dataset and give the name “trips
  • Go to More >> Query Settings. This will ensure that you have access permission from data fusion.
  • Under Destination select Set a destination table for query results. Also, Under Table name enter zone_id_mapping. Don’t forget to save.
No alt text provided for this image


  • On the Query Editor run the following query.
SELECT

  zone_id,

  zone_name,

  borough

FROM

  `bigquery-public-data.new_york_taxi_trips.taxi_zone_geom`

The beautiful thing about BigQuery here is that it understands that we are running a query against a public dataset. You don’t actually have to store data anymore to find insights in it.

  • If you are using the same dataset that I’m using you should find the following output familiar. And you can find zone_id
No alt text provided for this image
  • Now go back to your Data Fusion page and from the Plugin palette on the left, Select BigQuery. A BigQuery node will appear now on your fusion canvas.
  • Hover over this node and select Properties.
  • To configure the Reference Name, enter zone_mapping, which is used to identify this data source for lineage purposes. The BigQuery Dataset and Table configurations are the Dataset and Table you Setup in BigQuery a few steps earlier: trips and zone_id_mapping. For Temporary Bucket Name input the name of your project followed by "-temp", which corresponds to the bucket you created in Step 2.
No alt text provided for this image
  • To populate the schema of this table from BigQuery, click Get Schema. The fields will appear on the right side.

Step 6: Joining two sources

Now our mission is to join the two data sources—taxi trip data and zone names—to generate some meaningful output.

  • Under the Analytics section in the Plugin Palette. Choose Joiner, New Joiner node appears on the canvas.
  • To connect both Wrangler Node and BigQuery Node with the Joiner node, Simply drag the connection arrow.
No alt text provided for this image
  • Now go to Joiner node Properties. Change the Join Type to Inner. This is important because no one likes data duplicates.
  • Set the Join Condition to join the pickup_location_id column in the Wrangler node to the zone_id column in the BigQuery node.
No alt text provided for this image

Step 7: Storing the output to BigQuery

You need a data sink and this will be your target data sink.

  • In the sink section from the Plugin Palette. Choose BigQuery.
  • Connect the Joiner node to it
No alt text provided for this image
  • Open the BigQuery node by clicking on it and then selecting Properties. You will next configure the node as shown below. You will use a configuration that's similar to the existing BigQuery source. Provide bq_insert for the Reference Name field and then use trips for the dataset and the name of your project followed by "-temp" as Temporary Bucket Name. You will write to a new table that will be created for this pipeline execution. In the table field, enter trips_pickup_name.
No alt text provided for this image

Step 8: Deploying and running the pipeline

Now if you made ‘till here, Congratulations, you are now about to deploy your very first pipeline on google cloud. How cool is that!

  • Give your pipeline a name and a description. Click OK.
  • Now, Click Deploy.
No alt text provided for this image
  • On the next screen click Run to start processing data.
No alt text provided for this image

Step 9: Viewing the results

  • Return to the tab where you have BigQuery open. Run the query below to see the values in the trips_pickup_name table.
SELECT

  *

FROM

  `trips.trips_pickup_name`

Now, you can see your pipeline's output like the following screenshot.

No alt text provided for this image

This was how to build batch pipelines using Cloud Data Fusion. On the next article we will learn how to build more complex pipelines using Google Cloud Dataflow and Python.

To view or add a comment, sign in

Others also viewed

Explore content categories