A collection of examples demonstrating Apache Flink™'s Python API (PyFlink), updated to use modern APIs and run within a self-contained Docker environment.
These examples primarily use the PyFlink Table API, showcasing common patterns for batch processing.
Running PyFlink applications typically requires a Java runtime environment, Python, and specific dependencies (like PyFlink itself) to be available to the Flink cluster nodes. To simplify setup and ensure consistency, this project uses Docker Compose to manage a local Flink cluster built from a custom Dockerfile.
Here's the workflow:
- You use
docker compose up --build
(implicitly handled bymake start-flink
if the image doesn't exist) to build a custom Flink Docker image based on the includedDockerfile
. This image installs Python 3 and the necessary Python packages (apache-flink
,numpy
) on top of the official Flink image. - Docker Compose then starts Flink JobManager and TaskManager containers using this custom image.
- The project directory is mounted as a volume inside these containers, making your Python scripts accessible to Flink.
- You use
make run
which executesflink run
commands inside the JobManager container. - The
flink run
command submits your Python scripts (.py
files) to the Flink cluster. - Flink executes the Python scripts using the Python 3 environment built into the Docker image, potentially distributing tasks to the TaskManager(s).
- Any output printed by the Python scripts (like results) appears in the standard output logs of the Flink TaskManager containers.
This approach ensures the correct Java, Python, and Python dependencies are present and avoids configuration issues related to finding the Python executable.
- Docker and Docker Compose
- Python 3.6+ (Optional, for local utilities if desired)
- uv (Optional, for local setup)
- Homebrew (Optional, on macOS, used by the Makefile to install
uv
if not found)
Dockerfile
:- Starts from the official
flink:1.19.0-scala_2.12-java11
image. - Installs
python3
andpython3-pip
usingapt-get
. - Copies
requirements.txt
. - Installs Python dependencies (
apache-flink
,numpy
) usingpip3
.
- Starts from the official
docker-compose.yml
:- Defines
jobmanager
andtaskmanager
services. - Uses
build: .
to instruct Docker Compose to build the image using theDockerfile
in the current directory. - Exposes port
8081
for the Flink Web UI. - Sets basic Flink configuration.
- Mounts the current project directory (
.
) to/opt/flink/usrlib
inside the containers. - Connects the services via a
flink-network
.
- Defines
-
Clone the repository:
git clone https://github.com/wdm0006/flink-python-examples.git cd flink-python-examples
-
(Optional) Set up local Python environment: This step is optional for just running the examples via Docker.
make setup
This uses
uv
to create a.venv
and install Python dependencies locally. -
Build the Image and Start the Flink Cluster: The first time you run this, Docker Compose will build the image defined in the
Dockerfile
. Subsequent runs will reuse the existing image unless theDockerfile
or its context changes.make start-flink # Or directly: docker compose up -d --build
You can access the Flink Web UI at http://localhost:8081.
With the Flink cluster running, submit the example jobs:
make run
This command uses docker compose exec jobmanager flink run -py <script_path_in_container>
for each example script. Flink uses the Python 3 environment built into the Docker image.
Output: Check the Flink Web UI (http://localhost:8081) for running/completed jobs. View the stdout
logs of the TaskManager(s) to see printed results.
To submit a single example (e.g., word count):
make submit_word_count
make stop-flink
# Or directly: docker compose down
- Word Count: Counts word occurrences in a predefined string.
- Trending Hashtags: Generates sample "tweets", extracts hashtags, and counts their frequency.
- Data Enrichment: Reads sample JSON data and a CSV dimension table, joins them based on an attribute, and outputs the enriched data.
- Mean Values: Generates sample floating-point data and calculates the mean of each column.
- Mandelbrot Set: Generates candidate complex numbers and identifies points within the Mandelbrot set.
- Template Example: A basic skeleton (
template_example/application.py
) demonstrating the structure for a new PyFlink Table API job.
To remove the local virtual environment (if created) and stop/remove the Flink cluster containers:
make clean
Apache®, Apache Flink™, Flink™, and the Apache feather logo are trademarks of The Apache Software Foundation.