This comprehensive tutorial delves deep into Prefect, a powerful workflow orchestration tool.
Whether you’re a beginner or an experienced developer, this will guide you through the essential aspects of using Prefect for creating and managing data pipelines.
I have covered the following topics in this tutorial.
Based on your preference, you can watch this video tutorial or continue reading.
Today I’m excited to introduce you to the Prefect.
Table of Contents
The Prefect is a tool in Python that we can use to automate and manage the workflow and data pipeline in Python.
If you come across writing a complex program that involves multiple tasks to execute in a sequence and if you want to handle all the errors and schedule the execution, the prefect is a tool for you.
Perfect makes it easy for you to define the task. schedule them and monitor the workflow.
Whether you are processing the data, executing machine learning algorithms or automating something, the Prefect can be useful for you.
What you can expect from this tutorial?
The Prefect dashboard gives you real-time insight into all the tasks that are executed, whether those tasks are executed successfully, failed, or if there is any error.
By the end of this tutorial, you will have a solid understanding of the Prefect.
You will also be able to write the Prefect task and execute the workflow. here are the topics that we are going to cover in this video tutorial.
Whether you are new to the Prefect or you want to shop your Prefect skill, this tutorial is for you. Let’s dive in and start writing the Prefect.
Let’s start coding.
Now we are going to install the Prefect on your system. Before installing the Prefect on your system, I would recommend you create a virtual environment.
To create the virtual environment, use the following command.
python -m venv env-test
Give any name to your virtual environment (let’s say env-test). So once you execute this command, you will be able to create the virtual environment.
Activate the virtual environment.
source test-env/bin/activate
If you execute these commands, you will be inside the virtual environment.
Install the Prefect on your system by running the following pip command.
pip install prefect
You are all done with the installation.
Here is a sample code to define the Prefect task and flow.
Let’s save it as hello_prefect.py
.
from prefect import task, flow @task def say_hello(): print("Hello, World!") @flow def my_flow(): say_hello() if __name__ == "__main__": my_flow()
Explanation:
task
and flow
from Prefect
.say_hello
. And inside that we are printing the message “Hello world”. This is a simple function. task
. my_flow
. Inside this function, call the function say_hello
my_flow
function to the Prefect flow, write the decorator flow
.my_flow
.Output:
Execute the program as
python hello_prefect.py
You will get the following output.
16:58:38.840 | INFO | prefect.engine - Created flow run 'fuzzy-kittiwake' for flow 'my-flow'
16:58:38.883 | INFO | Flow run 'fuzzy-kittiwake' - Created task run 'say_hello-0' for task 'say_hello'
16:58:38.883 | INFO | Flow run 'fuzzy-kittiwake' - Executing 'say_hello-0' immediately...
Hello, World!
16:58:38.923 | INFO | Task run 'say_hello-0' - Finished in state Completed()
16:58:38.939 | INFO | Flow run 'fuzzy-kittiwake' - Finished in state Completed('All states completed.')
You can see the Hello, Wold!
message along with some info logs. These info logs are coming from the perfect server.
This is how you can write a simple task and the flow.
We can also add multiple tasks into the flow.
Let’s create another task, say say_perfect
, and call it from the Prefect flow
.
from prefect import task, flow @task def say_hello(): print("Hello, World!") @task def say_prefect(): print("Welcome to Prefect!") @flow def my_flow(): say_hello() say_prefect() if __name__ == "__main__": my_flow()
Output:
16:50:03.619 | INFO | prefect.engine - Created flow run 'belligerent-pronghorn' for flow 'my-flow'
16:50:03.666 | INFO | Flow run 'belligerent-pronghorn' - Created task run 'say_hello-0' for task 'say_hello'
16:50:03.667 | INFO | Flow run 'belligerent-pronghorn' - Executing 'say_hello-0' immediately...
Hello, World!
16:50:03.708 | INFO | Task run 'say_hello-0' - Finished in state Completed()
16:50:03.721 | INFO | Flow run 'belligerent-pronghorn' - Created task run 'say_prefect-0' for task 'say_prefect'
16:50:03.722 | INFO | Flow run 'belligerent-pronghorn' - Executing 'say_prefect-0' immediately...
Welcome to Prefect!
16:50:03.759 | INFO | Task run 'say_prefect-0' - Finished in state Completed()
16:50:03.775 | INFO | Flow run 'belligerent-pronghorn' - Finished in state Completed('All states completed.')
You can see the message “Hello, World”. You can also see the message “Welcome to Prefect”.
Both tasks are executed as a part of the flow.
Here you see the message “All states completed”. This means that we executed the Prefect flow successfully.
Now this is a very simple program that we have written to execute the Prefect flow.
This is how you can use the Prefect in Python to manage and automate the workflow execution and data pipeline.
Now we’re going to see the real-time example where you can use the Prefect flow and task.
Let’s consider you are using the ETL program.
As a part of ETL, we perform the following operations.
So basically it involves three tasks.
Here is the program that we are going to execute to automate the ETL program using the Prefect.
from prefect import flow, task import requests @task(retries=2) def extract_data(): url = "https://jsonplaceholder.typicode.com/posts" response = requests.get(url) data = response.json() return data @task def transform_data(data): transformed_data = [item['title'] for item in data] return transformed_data @task def load_data(transformed_data): # Just print the first 5 for simplicity for title in transformed_data[:5]: print(f"Loading title: {title}") @flow def etl_flow(name="ETL flow"): data = extract_data() transformed_data = transform_data(data) load_data(transformed_data) if __name__ == "__main__": etl_flow()
Explanation:
prefect
.extract_data
task, we get the data from this URL. transform_data
. Now we transform the actual data to get the desired information. load_data
. So in load_data
, we are supposed to save this data into database. But instead of storing this data in the database, we are just going to print this data.extract_data
, transform_data
, and load_data
. Save the file as etl_prefect_flow.py
.
Output:
Let’s execute the program.
python etl_prefect_flow.py
You see the following output text.
17:23:15.720 | INFO | prefect.engine - Created flow run 'bronze-pug' for flow 'etl-flow'
17:23:15.792 | INFO | Flow run 'bronze-pug' - Created task run 'extract_data-0' for task 'extract_data'
17:23:15.792 | INFO | Flow run 'bronze-pug' - Executing 'extract_data-0' immediately...
17:23:15.966 | INFO | Task run 'extract_data-0' - Finished in state Completed()
17:23:15.990 | INFO | Flow run 'bronze-pug' - Created task run 'transform_data-0' for task 'transform_data'
17:23:15.990 | INFO | Flow run 'bronze-pug' - Executing 'transform_data-0' immediately...
17:23:16.034 | INFO | Task run 'transform_data-0' - Finished in state Completed()
17:23:16.051 | INFO | Flow run 'bronze-pug' - Created task run 'load_data-0' for task 'load_data'
17:23:16.052 | INFO | Flow run 'bronze-pug' - Executing 'load_data-0' immediately...
Loading title: sunt aut facere repellat provident occaecati excepturi optio reprehenderit
Loading title: qui est esse
Loading title: ea molestias quasi exercitationem repellat qui ipsa sit aut
Loading title: eum et est occaecati
Loading title: nesciunt quas odio
17:23:16.101 | INFO | Task run 'load_data-0' - Finished in state Completed()
17:23:16.128 | INFO | Flow run 'bronze-pug' - Finished in state Completed('All states completed.')
You see that the program is executed successfully. All states are completed.
This is the simple ETL flow that we have created using the Prefect.
You can also add additional features provided by the Prefect.
Suppose you are extracting data from the external URL, when you extract the data from an external website, it is not always sure that you will get the data.
If you don’t get data or any error occurs while extracting the data, you can to retry that function execution again.
Add retries=2
in the task.
What it means is that if the function fails, retry it again.
Now we see how you can start the Prefect server and open the Prefect dashboard.
The command is very simple.
prefect server start
If you execute this command, you get the Prefect dashboard URL.
___ ___ ___ ___ ___ ___ _____
| _ \ _ \ __| __| __/ __|_ _|
| _/ / _|| _|| _| (__ | |
|_| |_|_\___|_| |___\___| |_|
Configure Prefect to communicate with the server with:
prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
View the API reference documentation at http://127.0.0.1:4200/docs
Check out the dashboard at http://127.0.0.1:4200
This is the localhost URL. Open that in your browser.
In the Prefect dashboard, you can monitor the Prefect task and flow.
This is how you can start the Prefect server and create the Prefect dashboard where you can monitor all the tasks.
Now let’s understand what the use of the prefect or what the prefect actually can do.
By now, you should have a solid understanding of the Prefect tool, and how to use the Prefect tool to create the task and flow.
Here are some more ideas to explore the Prefect further.
Experiment with the more complex workflow. Explore even more advanced features provided by the Prefect, just as mapping and result handling.
Check out the Prefect documentation to get more insight about the Prefect.
I hope you find it very useful and you are excited to use the Prefect in your project.
If you have any questions or need any further clarification, you can reach out to me in the comment or you can also connect with me. I love to hear how you are going to use the Prefect in your project.
Happy coding with the Prefect.