Mastering Workflow Automation: Using Prefect with Python for Data Pipelines

Mastering Workflow Automation: Using Prefect with Python for Data Pipelines

Mastering Workflow Automation: Using Prefect with Python for Data Pipelines

This comprehensive tutorial delves deep into Prefect, a powerful workflow orchestration tool.

Whether you’re a beginner or an experienced developer, this will guide you through the essential aspects of using Prefect for creating and managing data pipelines.

I have covered the following topics in this tutorial.

  • What is the Prefect?
    Understand the basics of Prefect and its significance in workflow orchestration.
  • How to install the Prefect Python package?
    Step-by-step guide to installing the Prefect package in your Python environment.
  • Writing data pipeline workflow in Python code by creating Prefect tasks and flow.
    Learn how to write data pipeline workflows using Prefect tasks and flows.
  • ETL Workflow in Prefect Python [Real-time example]
    A real-time example demonstrating an ETL workflow using Prefect.
  • Retry function in Python Prefect.
    Explore the retry function in Prefect and how it enhances the reliability of your workflows.
  • Creating the Prefect server dashboard.
    Set up and manage your Prefect server dashboard for better workflow monitoring and control.

Based on your preference, you can watch this video tutorial or continue reading.

Today I’m excited to introduce you to the Prefect.

What is Prefect?

The Prefect is a tool in Python that we can use to automate and manage the workflow and data pipeline in Python.

If you come across writing a complex program that involves multiple tasks to execute in a sequence and if you want to handle all the errors and schedule the execution, the prefect is a tool for you.

Perfect makes it easy for you to define the task. schedule them and monitor the workflow.

Whether you are processing the data, executing machine learning algorithms or automating something, the Prefect can be useful for you.

What you can expect from this tutorial?

  • Basics of the Prefect. 
  • How you can write the perfect task and combine these multiple tasks into the flow?
  • How do prefects execute the task?
  • How does the prefect handle the multiple-task dependency?
  • How you can monitor the tasks using the perfect dashboard?

The Prefect dashboard gives you real-time insight into all the tasks that are executed, whether those tasks are executed successfully, failed, or if there is any error.

By the end of this tutorial, you will have a solid understanding of the Prefect.

You will also be able to write the Prefect task and execute the workflow. here are the topics that we are going to cover in this video tutorial. 

Whether you are new to the Prefect or you want to shop your Prefect skill, this tutorial is for you. Let’s dive in and start writing the Prefect.

Install Python Prefect

Let’s start coding.

Now we are going to install the Prefect on your system. Before installing the Prefect on your system, I would recommend you create a virtual environment.

To create the virtual environment, use the following command.

python -m venv env-test

Give any name to your virtual environment (let’s say env-test). So once you execute this command, you will be able to create the virtual environment.

Activate the virtual environment.

source test-env/bin/activate

If you execute these commands, you will be inside the virtual environment.

Install the Prefect on your system by running the following pip command.

pip install prefect

You are all done with the installation.

Create Task and Flow | Your First Prefect Code

Here is a sample code to define the Prefect task and flow.

Let’s save it as hello_prefect.py.

from prefect import task, flow

@task
def say_hello():
    print("Hello, World!")

@flow
def my_flow():
    say_hello()

if __name__ == "__main__":
    my_flow()

Explanation:

  • We import task and flow from Prefect.
  • Write a simple function, say say_hello. And inside that we are printing the message “Hello world”. This is a simple function.
  • To convert this function to the perfect task, add a decorator task.
  • Create another function, say my_flow. Inside this function, call the function say_hello
  • To convert this my_flow function to the Prefect flow, write the decorator flow.
  • Call the function my_flow.

Output:

Execute the program as

python hello_prefect.py

You will get the following output.

16:58:38.840 | INFO    | prefect.engine - Created flow run 'fuzzy-kittiwake' for flow 'my-flow'
16:58:38.883 | INFO | Flow run 'fuzzy-kittiwake' - Created task run 'say_hello-0' for task 'say_hello'
16:58:38.883 | INFO | Flow run 'fuzzy-kittiwake' - Executing 'say_hello-0' immediately...
Hello, World!
16:58:38.923 | INFO | Task run 'say_hello-0' - Finished in state Completed()
16:58:38.939 | INFO | Flow run 'fuzzy-kittiwake' - Finished in state Completed('All states completed.')

You can see the Hello, Wold! message along with some info logs. These info logs are coming from the perfect server.

This is how you can write a simple task and the flow.

We can also add multiple tasks into the flow.

Let’s create another task, say say_perfect , and call it from the Prefect flow.

from prefect import task, flow

@task
def say_hello():
    print("Hello, World!")

@task
def say_prefect():
    print("Welcome to Prefect!")

@flow
def my_flow():
    say_hello()
    say_prefect()

if __name__ == "__main__":
    my_flow()

Output:

16:50:03.619 | INFO    | prefect.engine - Created flow run 'belligerent-pronghorn' for flow 'my-flow'
16:50:03.666 | INFO | Flow run 'belligerent-pronghorn' - Created task run 'say_hello-0' for task 'say_hello'
16:50:03.667 | INFO | Flow run 'belligerent-pronghorn' - Executing 'say_hello-0' immediately...
Hello, World!
16:50:03.708 | INFO | Task run 'say_hello-0' - Finished in state Completed()
16:50:03.721 | INFO | Flow run 'belligerent-pronghorn' - Created task run 'say_prefect-0' for task 'say_prefect'
16:50:03.722 | INFO | Flow run 'belligerent-pronghorn' - Executing 'say_prefect-0' immediately...
Welcome to Prefect!
16:50:03.759 | INFO | Task run 'say_prefect-0' - Finished in state Completed()
16:50:03.775 | INFO | Flow run 'belligerent-pronghorn' - Finished in state Completed('All states completed.')

You can see the message “Hello, World”. You can also see the message “Welcome to Prefect”.

Both tasks are executed as a part of the flow.

Here you see the message “All states completed”. This means that we executed the Prefect flow successfully.

Now this is a very simple program that we have written to execute the Prefect flow.

This is how you can use the Prefect in Python to manage and automate the workflow execution and data pipeline.

Managing ETL Flow using Prefect

Now we’re going to see the real-time example where you can use the Prefect flow and task.

Let’s consider you are using the ETL program.

As a part of ETL, we perform the following operations.

  • Extract the data from the website
  • Transform that data and
  • Load or Store the data in the database.

So basically it involves three tasks.

Here is the program that we are going to execute to automate the ETL program using the Prefect.

from prefect import flow, task
import requests

@task(retries=2)
def extract_data():
    url = "https://jsonplaceholder.typicode.com/posts"
    response = requests.get(url)
    data = response.json()
    return data

@task
def transform_data(data):
    transformed_data = [item['title'] for item in data]
    return transformed_data

@task
def load_data(transformed_data):
    # Just print the first 5 for simplicity
    for title in transformed_data[:5]:
        print(f"Loading title: {title}")

@flow
def etl_flow(name="ETL flow"):
    data = extract_data()
    transformed_data = transform_data(data)
    load_data(transformed_data)

if __name__ == "__main__":
    etl_flow()

Explanation:

  • Import the flow and task from the prefect.
  • Importing the request module. We are using a request module to extract the data from the website.
  • Create three tasks. It means three functions for the three tasks.
    The first function or task is to extract data. 
    The second task is to transform data.
    The third task is to load data.
  • Inside the extract_data task, we get the data from this URL.
  • Then we convert this JSON data to the Python dictionary and we return this data.
  • There is another task transform_data. Now we transform the actual data to get the desired information. 
  • And then there is a load_data. So in load_data, we are supposed to save this data into database. But instead of storing this data in the database, we are just going to print this data.
  • These are the three tasks we have created as a part of ETL.
  • Write the ETL flow and a simple Python function. Inside that, we are calling three tasks extract_data, transform_data, and load_data.
  • Execute the flow by calling the ETL flow.

Save the file as etl_prefect_flow.py.

Output:

Let’s execute the program.

python etl_prefect_flow.py

You see the following output text.

17:23:15.720 | INFO    | prefect.engine - Created flow run 'bronze-pug' for flow 'etl-flow'
17:23:15.792 | INFO | Flow run 'bronze-pug' - Created task run 'extract_data-0' for task 'extract_data'
17:23:15.792 | INFO | Flow run 'bronze-pug' - Executing 'extract_data-0' immediately...
17:23:15.966 | INFO | Task run 'extract_data-0' - Finished in state Completed()
17:23:15.990 | INFO | Flow run 'bronze-pug' - Created task run 'transform_data-0' for task 'transform_data'
17:23:15.990 | INFO | Flow run 'bronze-pug' - Executing 'transform_data-0' immediately...
17:23:16.034 | INFO | Task run 'transform_data-0' - Finished in state Completed()
17:23:16.051 | INFO | Flow run 'bronze-pug' - Created task run 'load_data-0' for task 'load_data'
17:23:16.052 | INFO | Flow run 'bronze-pug' - Executing 'load_data-0' immediately...
Loading title: sunt aut facere repellat provident occaecati excepturi optio reprehenderit
Loading title: qui est esse
Loading title: ea molestias quasi exercitationem repellat qui ipsa sit aut
Loading title: eum et est occaecati
Loading title: nesciunt quas odio
17:23:16.101 | INFO | Task run 'load_data-0' - Finished in state Completed()
17:23:16.128 | INFO | Flow run 'bronze-pug' - Finished in state Completed('All states completed.')

You see that the program is executed successfully. All states are completed.

This is the simple ETL flow that we have created using the Prefect.

You can also add additional features provided by the Prefect.

Retry Task in Prefect

Suppose you are extracting data from the external URL, when you extract the data from an external website, it is not always sure that you will get the data.

If you don’t get data or any error occurs while extracting the data, you can to retry that function execution again. 

Add retries=2 in the task.

What it means is that if the function fails, retry it again.

Create Prefect Dashboard Server

Now we see how you can start the Prefect server and open the Prefect dashboard.

The command is very simple.

prefect server start 

If you execute this command, you get the Prefect dashboard URL.

 ___ ___ ___ ___ ___ ___ _____ 
| _ \ _ \ __| __| __/ __|_ _|
| _/ / _|| _|| _| (__ | |
|_| |_|_\___|_| |___\___| |_|

Configure Prefect to communicate with the server with:

prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

View the API reference documentation at http://127.0.0.1:4200/docs

Check out the dashboard at http://127.0.0.1:4200

This is the localhost URL. Open that in your browser.

In the Prefect dashboard, you can monitor the Prefect task and flow.

  • You see the time when we run the ETL flow.
  • Inside the ETL flow, you can see that there are three tasks and all three tasks are completed successfully.
  • You can also see the time of execution for each task.
  • All these tasks are executed sequentially one after another.

This is how you can start the Prefect server and create the Prefect dashboard where you can monitor all the tasks.

Why Prefect is a Game-Changing Workflow Tool?

Now let’s understand what the use of the prefect or what the prefect actually can do.

  • Perfect is nothing but a task management system where we can break the workflow into smaller tasks and each task does a specific part of the job like downloading data or processing the data or storing data in the database.
  • Another use of the prefect server is to create a flow where we can combine multiple tasks to create one single flow.
  • It also allows us to add a retries and error handling mechanism to our task.
  • Another feature is scheduling with scheduling you can run automatically all the flow at a specific time or after intervals
  • The other feature is monitoring. Using the prepack dashboard, you can monitor all the tasks in the flow just to make sure all the tasks are executed successfully.

By now, you should have a solid understanding of the Prefect tool, and how to use the Prefect tool to create the task and flow.

Here are some more ideas to explore the Prefect further.

Experiment with the more complex workflow. Explore even more advanced features provided by the Prefect, just as mapping and result handling.

Check out the Prefect documentation to get more insight about the Prefect.

I hope you find it very useful and you are excited to use the Prefect in your project. 

If you have any questions or need any further clarification, you can reach out to me in the comment or you can also connect with me. I love to hear how you are going to use the Prefect in your project.

Happy coding with the Prefect.

Leave a Reply

Your email address will not be published. Required fields are marked *