Prefect: Deploy Flows on Work Pool
Prefect
Prefect is a workflow management system that allows you to define, schedule, and monitor workflows.
Introduction
In this post, we will learn how to deploy Prefect flows on a work pool, for triggering the flow via API.
Prerequisites
- Python installed
- Prefect installed
- Prefect server running (cmd:
prefect server start
)
Steps
- Create a work pool
- Transform function to flow
- Deploy flow on work pool
- Trigger flow via API
Create a work pool
Create a Process
work pool
prefect work-pool create --type process my-work-pool
Check the work pool
prefect work-pool ls
Start the work pool
prefect worker start --pool my-work-pool
Transform function to flow
Create a folder code
In this folder, create 2 files with one function each, who will be transformed to flow.
file 1: code/flow1.py | file 2: code/flow2.py |
---|---|
|
|
With there files, if u want, you can run the flows with the command python code/flow1.py
and python code/flow2.py
.
Deploy flow on work pool
Now we will deploy the flows on the work pool.
For easy deployment, we will create a YAML file with the flows configurations.
Create a file prefect.yaml
with the following content:
# Generic metadata about this project
name: test_flows_deployment
prefect-version: 3.1.0
# build section allows you to manage and build docker images
build: null
# push section allows you to manage if and how this project is uploaded to remote locations
push: null
# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
directory: /path/to/your/project
# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: flow1
version: null
tags: []
concurrency_limit: null
description: null
entrypoint: code/flow1.py
parameters: {}
work_pool:
name: my-work-pool
work_queue_name: null
job_variables: {}
enforce_parameter_schema: true
schedules: []
- name: flow2
version: null
tags: []
concurrency_limit: null
description: null
entrypoint: code/flow2.py
parameters: {}
work_pool:
name: my-work-pool
work_queue_name: null
job_variables: {}
enforce_parameter_schema: true
schedules: []
For deploying the flows based on the YAML file, run the following command:
prefect deploy --all
You can deploy a specific flow with the command:
prefect deploy --name flow1 --name flow2
Trigger flow via API
Now we can trigger the flows via API.
import asyncio
from prefect.client.orchestration import get_client
from prefect.deployments import run_deployment
# with the client
async def call():
client = get_client()
await client.create_flow_from_name("flow1")
# OR with the deployment
async def call():
await run_deployment("flow1")
if __name__ == "__main__":
asyncio.run(call())