Skip to content

jarojasm95/kueue

Repository files navigation

Kueue

pypi pypi CI style license

Python Distributed Task Queue - backed by Kafka

Motivation

Kueue was born out of the lack of a library that leverages Kafka for distributed task processing

Features

  • simple
  • intuitive api
  • extensible

Code Example

import time
from kueue import task, TaskExecutorConsumer, KueueConfig

KueueConfig(
    kafka_bootstrap='localhost:9092'
)

@task(topic="my-topic")
def sleepy_task(sleep: int):
    time.sleep(sleep)
    print("done sleeping", sleep)
    return sleep

sleepy_task.enqueue(args=(15,))

consumer = TaskExecutorConsumer(["my-topic"])
consumer.start()
# prints "done sleeping, 15"

Installation

pip install kueue

Development

Install poetry and run poetry install at the root of the repository. This should create a virtual environment with all the necessary python dependencies.

Tests

The test framework makes heavy use of pytest fixtures in order to spin up full integration environment consisting of a kubernetes cluster using kind and pytest-kind and kafka using strimzi

# unit tests
pytest
# unit tests + integration tests
pytest --integration

License

MIT © Jose Rojas